From 718f09c43a0c658134769397282e579b68a98b32 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 1 Oct 2022 21:17:18 -0500 Subject: [PATCH] Clean tracing, simplify validation, rename InputFormat -> VideoFormat --- src/details.rs | 12 +++--- src/exiftool.rs | 2 +- src/ffmpeg.rs | 47 +++++++++++++++++------- src/file.rs | 4 +- src/generate.rs | 8 ++-- src/ingest.rs | 18 ++++----- src/magick.rs | 17 ++------- src/main.rs | 1 + src/process.rs | 16 +++----- src/processor.rs | 3 +- src/repo.rs | 2 +- src/repo/sled.rs | 54 +++++++++++++-------------- src/store/file_store.rs | 13 +------ src/store/object_store.rs | 1 - src/validate.rs | 77 +++++++-------------------------------- 15 files changed, 107 insertions(+), 168 deletions(-) diff --git a/src/details.rs b/src/details.rs index e310ff4..5032aef 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,6 +1,6 @@ use crate::{ error::Error, - ffmpeg::InputFormat, + ffmpeg::VideoFormat, magick::{video_mp4, video_webm, ValidInputType}, serde_str::Serde, store::Store, @@ -29,7 +29,6 @@ impl Details { || self.content_type.type_() == "image" && self.content_type.subtype() == "gif" } - #[tracing::instrument("Details from bytes", skip(input))] pub(crate) async fn from_bytes(input: web::Bytes, hint: ValidInputType) -> Result { let details = if hint.is_video() { crate::ffmpeg::details_bytes(input.clone()).await? @@ -51,7 +50,6 @@ impl Details { )) } - #[tracing::instrument("Details from store")] pub(crate) async fn from_store( store: S, identifier: S::Identifier, @@ -100,17 +98,17 @@ impl Details { self.created_at.into() } - pub(crate) fn to_input_format(&self) -> Option { + pub(crate) fn to_input_format(&self) -> Option { if *self.content_type == mime::IMAGE_GIF { - return Some(InputFormat::Gif); + return Some(VideoFormat::Gif); } if *self.content_type == video_mp4() { - return Some(InputFormat::Mp4); + return Some(VideoFormat::Mp4); } if *self.content_type == video_webm() { - return Some(InputFormat::Webm); + return Some(VideoFormat::Webm); } None diff --git a/src/exiftool.rs b/src/exiftool.rs index b6060b7..f2e0801 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -2,7 +2,7 @@ use crate::process::Process; use actix_web::web::Bytes; use tokio::io::AsyncRead; -#[tracing::instrument(name = "Clearing metadata", skip(input))] +#[tracing::instrument(level = "trace", skip(input))] pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?; diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index bceb3f1..a9d9413 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,5 +1,5 @@ use crate::{ - config::{AudioCodec, VideoCodec}, + config::{AudioCodec, ImageFormat, VideoCodec}, error::{Error, UploadError}, magick::{Details, ValidInputType}, process::Process, @@ -7,10 +7,9 @@ use crate::{ }; use actix_web::web::Bytes; use tokio::io::{AsyncRead, AsyncReadExt}; -use tracing::instrument; #[derive(Clone, Copy, Debug)] -pub(crate) enum InputFormat { +pub(crate) enum VideoFormat { Gif, Mp4, Webm, @@ -28,7 +27,26 @@ pub(crate) enum ThumbnailFormat { // Webp, } -impl InputFormat { +#[derive(Clone, Copy, Debug)] +pub(crate) enum FileFormat { + Image(ImageFormat), + Video(VideoFormat), +} + +impl ValidInputType { + pub(crate) fn to_file_format(self) -> FileFormat { + match self { + Self::Gif => FileFormat::Video(VideoFormat::Gif), + Self::Mp4 => FileFormat::Video(VideoFormat::Mp4), + Self::Webm => FileFormat::Video(VideoFormat::Webm), + Self::Jpeg => FileFormat::Image(ImageFormat::Jpeg), + Self::Png => FileFormat::Image(ImageFormat::Png), + Self::Webp => FileFormat::Image(ImageFormat::Webp), + } + } +} + +impl VideoFormat { const fn to_file_extension(self) -> &'static str { match self { Self::Gif => ".gif", @@ -121,10 +139,10 @@ impl AudioCodec { } } -const FORMAT_MAPPINGS: &[(&str, InputFormat)] = &[ - ("gif", InputFormat::Gif), - ("mp4", InputFormat::Mp4), - ("webm", InputFormat::Webm), +const FORMAT_MAPPINGS: &[(&str, VideoFormat)] = &[ + ("gif", VideoFormat::Gif), + ("mp4", VideoFormat::Mp4), + ("webm", VideoFormat::Webm), ]; pub(crate) async fn input_type_bytes(input: Bytes) -> Result, Error> { @@ -155,6 +173,7 @@ pub(crate) async fn details_bytes(input: Bytes) -> Result, Error .await } +#[tracing::instrument(skip(f))] async fn details_file(f: F) -> Result, Error> where F: FnOnce(crate::file::File) -> Fut, @@ -193,7 +212,7 @@ where } fn parse_details(output: std::borrow::Cow<'_, str>) -> Result, Error> { - tracing::info!("OUTPUT: {}", output); + tracing::debug!("OUTPUT: {}", output); let mut lines = output.lines(); @@ -230,7 +249,7 @@ fn parse_details_inner( width: &str, height: &str, frames: &str, - format: InputFormat, + format: VideoFormat, ) -> Result { let width = width.parse().map_err(|_| UploadError::UnsupportedFormat)?; let height = height.parse().map_err(|_| UploadError::UnsupportedFormat)?; @@ -244,10 +263,10 @@ fn parse_details_inner( }) } -#[tracing::instrument(name = "Transcode video", skip(input))] +#[tracing::instrument(skip(input))] pub(crate) async fn trancsocde_bytes( input: Bytes, - input_format: InputFormat, + input_format: VideoFormat, permit_audio: bool, video_codec: VideoCodec, audio_codec: Option, @@ -318,11 +337,11 @@ pub(crate) async fn trancsocde_bytes( Ok(Box::pin(clean_reader)) } -#[instrument(name = "Create video thumbnail")] +#[tracing::instrument] pub(crate) async fn thumbnail( store: S, from: S::Identifier, - input_format: InputFormat, + input_format: VideoFormat, format: ThumbnailFormat, ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension())); diff --git a/src/file.rs b/src/file.rs index aa6643b..a0014c9 100644 --- a/src/file.rs +++ b/src/file.rs @@ -126,7 +126,7 @@ mod io_uring { impl File { pub(crate) async fn open(path: impl AsRef) -> std::io::Result { - tracing::info!("Opening io-uring file: {:?}", path.as_ref()); + tracing::debug!("Opening io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), inner: tracing::trace_span!(parent: None, "Open File") @@ -136,7 +136,7 @@ mod io_uring { } pub(crate) async fn create(path: impl AsRef) -> std::io::Result { - tracing::info!("Creating io-uring file: {:?}", path.as_ref()); + tracing::debug!("Creating io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), inner: tracing::trace_span!(parent: None, "Create File") diff --git a/src/generate.rs b/src/generate.rs index f39ff63..d43d965 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -3,7 +3,7 @@ use crate::{ config::ImageFormat, details::Details, error::Error, - ffmpeg::{InputFormat, ThumbnailFormat}, + ffmpeg::{ThumbnailFormat, VideoFormat}, repo::{Alias, FullRepo}, store::Store, }; @@ -21,7 +21,7 @@ pub(crate) async fn generate( alias: Alias, thumbnail_path: PathBuf, thumbnail_args: Vec, - input_format: Option, + input_format: Option, thumbnail_format: Option, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { @@ -52,7 +52,7 @@ async fn process( alias: Alias, thumbnail_path: PathBuf, thumbnail_args: Vec, - input_format: Option, + input_format: Option, thumbnail_format: Option, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { @@ -68,7 +68,7 @@ async fn process( let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, - input_format.unwrap_or(InputFormat::Mp4), + input_format.unwrap_or(VideoFormat::Mp4), thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg), ) .await?; diff --git a/src/ingest.rs b/src/ingest.rs index 8b0e5bf..40b19f8 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -27,7 +27,7 @@ where identifier: Option, } -#[tracing::instrument(name = "Aggregate", skip(stream))] +#[tracing::instrument(skip(stream))] async fn aggregate(mut stream: S) -> Result where S: Stream> + Unpin, @@ -41,7 +41,7 @@ where Ok(buf.into_bytes()) } -#[tracing::instrument(name = "Ingest", skip(stream))] +#[tracing::instrument(skip(stream))] pub(crate) async fn ingest( repo: &R, store: &S, @@ -231,12 +231,13 @@ where { #[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))] fn drop(&mut self) { + let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); + cleanup_parent_span.follows_from(Span::current()); + if let Some(hash) = self.hash.take() { let repo = self.repo.clone(); - let cleanup_span = - tracing::info_span!(parent: None, "Session cleanup hash", hash = ?hash); - cleanup_span.follows_from(Span::current()); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( @@ -251,9 +252,7 @@ where if let Some(alias) = self.alias.take() { let repo = self.repo.clone(); - let cleanup_span = - tracing::info_span!(parent: None, "Session cleanup alias", alias = ?alias); - cleanup_span.follows_from(Span::current()); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( @@ -275,8 +274,7 @@ where if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); - let cleanup_span = tracing::info_span!(parent: None, "Session cleanup identifier", identifier = ?identifier); - cleanup_span.follows_from(Span::current()); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( diff --git a/src/magick.rs b/src/magick.rs index 24353a7..b2582fe 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -10,7 +10,6 @@ use tokio::{ io::{AsyncRead, AsyncReadExt}, process::Command, }; -use tracing::instrument; pub(crate) fn details_hint(alias: &Alias) -> Option { let ext = alias.extension()?; @@ -114,14 +113,7 @@ pub(crate) struct Details { pub(crate) frames: Option, } -#[tracing::instrument(name = "Clear Metadata", skip(input))] -pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { - let process = Process::run("magick", &["convert", "-", "-strip", "-"])?; - - Ok(process.bytes_read(input)) -} - -#[tracing::instrument(name = "Convert", skip(input))] +#[tracing::instrument(level = "debug", skip(input))] pub(crate) fn convert_bytes_read( input: Bytes, format: ImageFormat, @@ -139,7 +131,7 @@ pub(crate) fn convert_bytes_read( Ok(process.bytes_read(input)) } -#[instrument(name = "Getting details from input bytes", skip(input))] +#[tracing::instrument(skip(input))] pub(crate) async fn details_bytes( input: Bytes, hint: Option, @@ -290,7 +282,6 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { }) } -#[instrument(name = "Getting input type from bytes", skip(input))] pub(crate) async fn input_type_bytes(input: Bytes) -> Result { details_bytes(input, None).await?.validate_input() } @@ -308,7 +299,6 @@ fn process_image(args: Vec, format: ImageFormat) -> std::io::Result( store: S, identifier: S::Identifier, @@ -318,7 +308,6 @@ pub(crate) fn process_image_store_read( Ok(process_image(args, format)?.store_read(store, identifier)) } -#[instrument(name = "Spawning process command", skip(async_read))] pub(crate) fn process_image_async_read( async_read: A, args: Vec, @@ -328,7 +317,7 @@ pub(crate) fn process_image_async_read( } impl Details { - #[instrument(name = "Validating input type")] + #[tracing::instrument(level = "debug", name = "Validating input type")] pub(crate) fn validate_input(&self) -> Result { if self.width > crate::CONFIG.media.max_width || self.height > crate::CONFIG.media.max_height diff --git a/src/main.rs b/src/main.rs index b5961a9..346de4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -958,6 +958,7 @@ async fn aliases( }))) } +#[instrument(name = "Fetching identifier", skip(repo))] async fn identifier( query: web::Query, repo: web::Data, diff --git a/src/process.rs b/src/process.rs index e817514..3ba4bc1 100644 --- a/src/process.rs +++ b/src/process.rs @@ -42,13 +42,11 @@ pin_project_lite::pin_project! { } impl Process { - #[tracing::instrument] pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result { tracing::trace_span!(parent: None, "Create command") .in_scope(|| Self::spawn(Command::new(command).args(args))) } - #[tracing::instrument] pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result { tracing::trace_span!(parent: None, "Spawn command").in_scope(|| { let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); @@ -66,37 +64,34 @@ impl Process { Ok(()) } - #[tracing::instrument(skip(input))] pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin { - self.read_fn(move |mut stdin| { + self.spawn_fn(move |mut stdin| { let mut input = input; async move { stdin.write_all_buf(&mut input).await } }) } - #[tracing::instrument] pub(crate) fn read(self) -> impl AsyncRead + Unpin { - self.read_fn(|_| async { Ok(()) }) + self.spawn_fn(|_| async { Ok(()) }) } pub(crate) fn pipe_async_read( self, mut async_read: A, ) -> impl AsyncRead + Unpin { - self.read_fn(move |mut stdin| async move { + self.spawn_fn(move |mut stdin| async move { tokio::io::copy(&mut async_read, &mut stdin) .await .map(|_| ()) }) } - #[tracing::instrument] pub(crate) fn store_read( self, store: S, identifier: S::Identifier, ) -> impl AsyncRead + Unpin { - self.read_fn(move |mut stdin| { + self.spawn_fn(move |mut stdin| { let store = store; let identifier = identifier; @@ -104,7 +99,8 @@ impl Process { }) } - fn read_fn(mut self, f: F) -> impl AsyncRead + Unpin + #[tracing::instrument(skip(f))] + fn spawn_fn(mut self, f: F) -> impl AsyncRead + Unpin where F: FnOnce(ChildStdin) -> Fut + 'static, Fut: Future>, diff --git a/src/processor.rs b/src/processor.rs index 97c43f9..19c14af 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,6 +1,5 @@ use crate::error::{Error, UploadError}; use std::path::PathBuf; -use tracing::instrument; pub(crate) trait Processor { const NAME: &'static str; @@ -88,7 +87,7 @@ impl ResizeKind { } } -#[instrument] +#[tracing::instrument(level = "debug")] pub(crate) fn build_chain( args: &[(String, String)], ext: &str, diff --git a/src/repo.rs b/src/repo.rs index 68fb868..1634328 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -606,7 +606,7 @@ where Ok(()) } -#[tracing::instrument(skip(repo))] +#[tracing::instrument(level = "debug", skip(repo))] async fn migrate_identifier_details( repo: &T, old: &FileId, diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ba69220..458dd9e 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -152,7 +152,7 @@ fn insert_cache_inverse( bucket.insert(alias_bytes.to_vec()); - tracing::info!("Inserting new {:?}", bucket); + tracing::trace!("Inserting new {:?}", bucket); let bucket_bytes = serde_cbor::to_vec(&bucket)?; let new = Some(bucket_bytes); @@ -218,10 +218,10 @@ impl CachedRepo for SledRepo { bucket.remove(&alias_bytes); if bucket.is_empty() { - tracing::info!("Removed old {:?}", bucket); + tracing::trace!("Removed old {:?}", bucket); None } else { - tracing::info!("Inserting old {:?}", bucket); + tracing::trace!("Inserting old {:?}", bucket); Some(serde_cbor::to_vec(&bucket)) } }) @@ -243,12 +243,12 @@ impl CachedRepo for SledRepo { cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) { if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { - tracing::info!("Checking {}", datetime); + tracing::trace!("Checking {}", datetime); } else { tracing::warn!("Invalid date bytes"); } if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { - tracing::info!("Read for deletion: {:?}", bucket); + tracing::trace!("Read for deletion: {:?}", bucket); for alias_bytes in bucket { // Best effort cleanup let _ = cache.remove(&alias_bytes); @@ -266,7 +266,7 @@ impl CachedRepo for SledRepo { #[cfg(debug)] for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) { if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { - tracing::info!("Not cleaning for {}", datetime); + tracing::trace!("Not cleaning for {}", datetime); } else { tracing::warn!("Invalid date bytes"); } @@ -468,21 +468,21 @@ impl QueueRepo for SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { - #[tracing::instrument(skip(value))] + #[tracing::instrument(level = "trace", skip(value))] async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> { b!(self.settings, settings.insert(key, value)); Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn get(&self, key: &'static str) -> Result, Error> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn remove(&self, key: &'static str) -> Result<(), Error> { b!(self.settings, settings.remove(key)); @@ -505,7 +505,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { - #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_details( &self, identifier: &I, @@ -522,7 +522,7 @@ impl IdentifierRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn details(&self, identifier: &I) -> Result, Error> { let key = identifier.to_bytes()?; @@ -535,7 +535,7 @@ impl IdentifierRepo for SledRepo { } } - #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn cleanup(&self, identifier: &I) -> Result<(), Error> { let key = identifier.to_bytes()?; @@ -568,7 +568,7 @@ impl HashRepo for SledRepo { Box::pin(from_iterator(iter, 8)) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn create(&self, hash: Self::Bytes) -> Result, Error> { let res = b!(self.hashes, { let hash2 = hash.clone(); @@ -578,7 +578,7 @@ impl HashRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); let value = alias.to_bytes(); @@ -588,7 +588,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); @@ -611,7 +611,7 @@ impl HashRepo for SledRepo { Ok(v) } - #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_identifier( &self, hash: Self::Bytes, @@ -624,7 +624,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn identifier(&self, hash: Self::Bytes) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); @@ -633,7 +633,7 @@ impl HashRepo for SledRepo { .and_then(|ivec| I::from_bytes(ivec.to_vec())) } - #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_variant_identifier( &self, hash: Self::Bytes, @@ -651,7 +651,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn variant_identifier( &self, hash: Self::Bytes, @@ -693,7 +693,7 @@ impl HashRepo for SledRepo { Ok(vec) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> { let key = variant_key(&hash, &variant); @@ -705,7 +705,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_motion_identifier( &self, hash: Self::Bytes, @@ -721,7 +721,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn motion_identifier( &self, hash: Self::Bytes, @@ -785,7 +785,7 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn create(&self, alias: &Alias) -> Result, Error> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -798,7 +798,7 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn relate_delete_token( &self, alias: &Alias, @@ -815,7 +815,7 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn delete_token(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); @@ -826,7 +826,7 @@ impl AliasRepo for SledRepo { .map_err(Error::from) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> { let key = alias.to_bytes(); @@ -835,7 +835,7 @@ impl AliasRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn hash(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 455683c..917e6d4 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -13,7 +13,7 @@ use std::{ use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; -use tracing::{debug, error, instrument, Instrument}; +use tracing::Instrument; mod file_id; pub(crate) use file_id::FileId; @@ -211,7 +211,6 @@ impl FileStore { } // Try writing to a file - #[instrument(name = "Saving file", skip(bytes), fields(path = tracing::field::debug(&path.as_ref())))] async fn safe_save_bytes>( &self, path: P, @@ -220,7 +219,6 @@ impl FileStore { safe_create_parent(&path).await?; // Only write the file if it doesn't already exist - debug!("Checking if {:?} already exists", path.as_ref()); if let Err(e) = tokio::fs::metadata(&path).await { if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); @@ -230,23 +228,18 @@ impl FileStore { } // Open the file for writing - debug!("Creating {:?}", path.as_ref()); let mut file = File::create(&path).await?; // try writing - debug!("Writing to {:?}", path.as_ref()); if let Err(e) = file.write_from_bytes(bytes).await { - error!("Error writing {:?}, {}", path.as_ref(), format!("{}", e)); // remove file if writing failed before completion self.safe_remove_file(path).await?; return Err(e.into()); } - debug!("{:?} written", path.as_ref()); Ok(()) } - #[instrument(skip(input), fields(to = tracing::field::debug(&to.as_ref())))] async fn safe_save_reader>( &self, to: P, @@ -254,7 +247,6 @@ impl FileStore { ) -> Result<(), FileError> { safe_create_parent(&to).await?; - debug!("Checking if {:?} already exists", to.as_ref()); if let Err(e) = tokio::fs::metadata(&to).await { if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); @@ -263,8 +255,6 @@ impl FileStore { return Err(FileError::FileExists); } - debug!("Writing stream to {:?}", to.as_ref()); - let mut file = File::create(to).await?; file.write_from_async_read(input).await?; @@ -275,7 +265,6 @@ impl FileStore { pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { if let Some(path) = path.as_ref().parent() { - debug!("Creating directory {:?}", path); tokio::fs::create_dir_all(path).await?; } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 4b4533b..7549782 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -163,7 +163,6 @@ impl Store for ObjectStore { type Identifier = ObjectId; type Stream = Pin>>>; - #[tracing::instrument(skip(reader))] async fn save_async_read(&self, reader: Reader) -> Result where Reader: AsyncRead + Unpin + 'static, diff --git a/src/validate.rs b/src/validate.rs index f4501df..ec1f4c2 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -2,12 +2,11 @@ use crate::{ config::{AudioCodec, ImageFormat, VideoCodec}, either::Either, error::{Error, UploadError}, - ffmpeg::InputFormat, + ffmpeg::FileFormat, magick::ValidInputType, }; use actix_web::web::Bytes; use tokio::io::AsyncRead; -use tracing::instrument; struct UnvalidatedBytes { bytes: Bytes, @@ -36,7 +35,7 @@ impl AsyncRead for UnvalidatedBytes { } } -#[instrument(name = "Validate media", skip(bytes))] +#[tracing::instrument(skip_all)] pub(crate) async fn validate_bytes( bytes: Bytes, prescribed_format: Option, @@ -57,8 +56,8 @@ pub(crate) async fn validate_bytes( return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes)))); } - match (prescribed_format, input_type) { - (_, ValidInputType::Gif) => { + match (input_type.to_file_format(), prescribed_format) { + (FileFormat::Video(video_format), _) => { if !(enable_silent_video || enable_full_video) { return Err(UploadError::SilentVideoDisabled.into()); } @@ -67,25 +66,7 @@ pub(crate) async fn validate_bytes( Either::right(Either::left( crate::ffmpeg::trancsocde_bytes( bytes, - InputFormat::Gif, - false, - video_codec, - audio_codec, - ) - .await?, - )), - )) - } - (_, ValidInputType::Mp4) => { - if !(enable_silent_video || enable_full_video) { - return Err(UploadError::SilentVideoDisabled.into()); - } - Ok(( - ValidInputType::from_video_codec(video_codec), - Either::right(Either::left( - crate::ffmpeg::trancsocde_bytes( - bytes, - InputFormat::Mp4, + video_format, enable_full_video, video_codec, audio_codec, @@ -94,47 +75,17 @@ pub(crate) async fn validate_bytes( )), )) } - (_, ValidInputType::Webm) => { - if !(enable_silent_video || enable_full_video) { - return Err(UploadError::SilentVideoDisabled.into()); - } - Ok(( - ValidInputType::from_video_codec(video_codec), - Either::right(Either::left( - crate::ffmpeg::trancsocde_bytes( - bytes, - InputFormat::Webm, - enable_full_video, - video_codec, - audio_codec, - ) - .await?, - )), - )) - } - (Some(ImageFormat::Jpeg) | None, ValidInputType::Jpeg) => Ok(( - ValidInputType::Jpeg, - Either::right(Either::right(Either::left( - crate::exiftool::clear_metadata_bytes_read(bytes)?, - ))), - )), - (Some(ImageFormat::Png) | None, ValidInputType::Png) => Ok(( - ValidInputType::Png, - Either::right(Either::right(Either::left( - crate::exiftool::clear_metadata_bytes_read(bytes)?, - ))), - )), - (Some(ImageFormat::Webp) | None, ValidInputType::Webp) => Ok(( - ValidInputType::Webp, - Either::right(Either::right(Either::right(Either::left( - crate::magick::clear_metadata_bytes_read(bytes)?, - )))), - )), - (Some(format), _) => Ok(( + (FileFormat::Image(image_format), Some(format)) if image_format != format => Ok(( ValidInputType::from_format(format), - Either::right(Either::right(Either::right(Either::right( + Either::right(Either::right(Either::left( crate::magick::convert_bytes_read(bytes, format)?, - )))), + ))), + )), + (FileFormat::Image(image_format), _) => Ok(( + ValidInputType::from_format(image_format), + Either::right(Either::right(Either::right( + crate::exiftool::clear_metadata_bytes_read(bytes)?, + ))), )), } }