From e493e90dd42a28dff36cf510b65e32aa63c04dad Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Thu, 7 Apr 2022 12:56:40 -0500 Subject: [PATCH] Defensive tracing --- src/backgrounded.rs | 12 ++++--- src/concurrent_processor.rs | 4 ++- src/file.rs | 16 ++++++--- src/generate.rs | 10 +++--- src/ingest.rs | 32 +++++++++-------- src/ingest/hasher.rs | 3 +- src/main.rs | 30 +++++++++------- src/process.rs | 72 ++++++++++++++++++++----------------- src/stream.rs | 3 +- src/tmp_file.rs | 3 +- 10 files changed, 111 insertions(+), 74 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index e696161..d2bfd3a 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -76,16 +76,20 @@ where if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + }) }); } if let Some(upload_id) = self.upload_id { let repo = self.repo.clone(); - actix_rt::spawn(async move { - let _ = repo.claim(upload_id).await; + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + let _ = repo.claim(upload_id).await; + }) }); } } diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 92a8cf5..8ec4ae1 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -58,7 +58,9 @@ where (None, span) } Entry::Occupied(mut occupied) => { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") + .in_scope(tokio::sync::oneshot::channel); + occupied.get_mut().push(tx); let span = tracing::info_span!( "Waiting for processed image", diff --git a/src/file.rs b/src/file.rs index 34348a5..aa6643b 100644 --- a/src/file.rs +++ b/src/file.rs @@ -20,13 +20,17 @@ mod tokio_file { impl File { pub(crate) async fn open(path: impl AsRef) -> std::io::Result { Ok(File { - inner: tokio::fs::File::open(path).await?, + inner: tracing::trace_span!(parent: None, "Open File") + .in_scope(|| tokio::fs::File::open(path)) + .await?, }) } pub(crate) async fn create(path: impl AsRef) -> std::io::Result { Ok(File { - inner: tokio::fs::File::create(path).await?, + inner: tracing::trace_span!(parent: None, "Create File") + .in_scope(|| tokio::fs::File::create(path)) + .await?, }) } @@ -125,7 +129,9 @@ mod io_uring { tracing::info!("Opening io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), - inner: tokio_uring::fs::File::open(path).await?, + inner: tracing::trace_span!(parent: None, "Open File") + .in_scope(|| tokio_uring::fs::File::open(path)) + .await?, }) } @@ -133,7 +139,9 @@ mod io_uring { tracing::info!("Creating io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), - inner: tokio_uring::fs::File::create(path).await?, + inner: tracing::trace_span!(parent: None, "Create File") + .in_scope(|| tokio_uring::fs::File::create(path)) + .await?, }) } diff --git a/src/generate.rs b/src/generate.rs index 3bd9467..596a5a0 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -10,6 +10,7 @@ use crate::{ use actix_web::web::Bytes; use std::path::PathBuf; use tokio::io::AsyncReadExt; +use tracing::Instrument; #[tracing::instrument(skip(hash))] pub(crate) async fn generate( @@ -47,9 +48,7 @@ async fn process( thumbnail_args: Vec, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { - let permit = tracing::trace_span!(parent: None, "Aquire semaphore") - .in_scope(|| crate::PROCESS_SEMAPHORE.acquire()) - .await; + let permit = crate::PROCESS_SEMAPHORE.acquire().await; let identifier = if let Some(identifier) = repo .still_identifier_from_alias::(&alias) @@ -77,7 +76,10 @@ async fn process( crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; let mut vec = Vec::new(); - processed_reader.read_to_end(&mut vec).await?; + processed_reader + .read_to_end(&mut vec) + .instrument(tracing::info_span!("Reading processed image to vec")) + .await?; let bytes = Bytes::from(vec); drop(permit); diff --git a/src/ingest.rs b/src/ingest.rs index a052653..ae84a49 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -62,9 +62,7 @@ where R: FullRepo + 'static, S: Store, { - let permit = tracing::trace_span!(parent: None, "Aquire semaphore") - .in_scope(|| crate::PROCESS_SEMAPHORE.acquire()) - .await; + let permit = crate::PROCESS_SEMAPHORE.acquire().await; let bytes = aggregate(stream).await?; @@ -205,31 +203,37 @@ where fn drop(&mut self) { if let Some(hash) = self.hash.take() { let repo = self.repo.clone(); - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + }) }); } if let Some(alias) = self.alias.take() { let repo = self.repo.clone(); - actix_rt::spawn(async move { - if let Ok(token) = repo.delete_token(&alias).await { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } else { - let token = DeleteToken::generate(); - if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + if let Ok(token) = repo.delete_token(&alias).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } else { + let token = DeleteToken::generate(); + if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } } - } + }) }); } if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + }) }); } } diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index e1a1551..71708ef 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -70,7 +70,8 @@ mod test { actix_rt::System::new().block_on(async move { let arbiter = actix_rt::Arbiter::new(); - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") + .in_scope(|| tokio::sync::oneshot::channel()); arbiter.spawn(async move { let handle = actix_rt::spawn($fut); diff --git a/src/main.rs b/src/main.rs index 13de2c2..3a61984 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,8 +75,10 @@ static DO_CONFIG: Lazy<(Configuration, Operation)> = Lazy::new(|| config::configure().expect("Failed to configure")); static CONFIG: Lazy = Lazy::new(|| DO_CONFIG.0.clone()); static OPERATION: Lazy = Lazy::new(|| DO_CONFIG.1.clone()); -static PROCESS_SEMAPHORE: Lazy = - Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); +static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| { + tracing::trace_span!(parent: None, "Initialize semaphore") + .in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) +}); /// Handle responding to succesful uploads #[instrument(name = "Uploaded files", skip(value))] @@ -790,16 +792,20 @@ async fn launch( let store = store.clone(); let repo = repo.clone(); - actix_rt::spawn(queue::process_cleanup( - repo.clone(), - store.clone(), - next_worker_id(), - )); - actix_rt::spawn(queue::process_images( - repo.clone(), - store.clone(), - next_worker_id(), - )); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_cleanup( + repo.clone(), + store.clone(), + next_worker_id(), + )) + }); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_images( + repo.clone(), + store.clone(), + next_worker_id(), + )) + }); App::new() .wrap(TracingLogger::default()) diff --git a/src/process.rs b/src/process.rs index b6337cd..386c6a4 100644 --- a/src/process.rs +++ b/src/process.rs @@ -43,14 +43,17 @@ pin_project_lite::pin_project! { impl Process { #[tracing::instrument] pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result { - Self::spawn(Command::new(command).args(args)) + tracing::trace_span!(parent: None, "Create command") + .in_scope(|| Self::spawn(Command::new(command).args(args))) } #[tracing::instrument] pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result { - let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); + tracing::trace_span!(parent: None, "Spawn command").in_scope(|| { + let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); - cmd.spawn().map(|child| Process { child }) + cmd.spawn().map(|child| Process { child }) + }) } #[tracing::instrument] @@ -67,10 +70,11 @@ impl Process { let mut stdin = self.child.stdin.take().expect("stdin exists"); let stdout = self.child.stdout.take().expect("stdout exists"); - let (tx, rx) = channel::(); + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") + .in_scope(channel::); let mut child = self.child; - let handle = tracing::trace_span!(parent: None, "Spawn").in_scope(|| { + let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn(async move { if let Err(e) = stdin.write_all_buf(&mut input).await { let _ = tx.send(e); @@ -104,21 +108,23 @@ impl Process { pub(crate) fn read(mut self) -> impl AsyncRead + Unpin { let stdout = self.child.stdout.take().expect("stdout exists"); - let (tx, rx) = channel(); + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); let mut child = self.child; - let handle = actix_rt::spawn(async move { - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = - tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx + .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + } + Err(e) => { + let _ = tx.send(e); } } - Err(e) => { - let _ = tx.send(e); - } - } + }) }); ProcessRead { @@ -138,27 +144,29 @@ impl Process { let mut stdin = self.child.stdin.take().expect("stdin exists"); let stdout = self.child.stdout.take().expect("stdout exists"); - let (tx, rx) = channel(); + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); let mut child = self.child; - let handle = actix_rt::spawn(async move { - if let Err(e) = store.read_into(&identifier, &mut stdin).await { - let _ = tx.send(e); - return; - } - drop(stdin); + let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + if let Err(e) = store.read_into(&identifier, &mut stdin).await { + let _ = tx.send(e); + return; + } + drop(stdin); - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = - tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx + .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + } + Err(e) => { + let _ = tx.send(e); } } - Err(e) => { - let _ = tx.send(e); - } - } + }) }); ProcessRead { diff --git a/src/stream.rs b/src/stream.rs index bd14aef..8848ef5 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -187,7 +187,8 @@ where match std::mem::replace(&mut this.state, IterStreamState::Pending) { IterStreamState::New { iterator, buffer } => { - let (sender, receiver) = tokio::sync::mpsc::channel(buffer); + let (sender, receiver) = tracing::trace_span!(parent: None, "Create channel") + .in_scope(|| tokio::sync::mpsc::channel(buffer)); let mut handle = actix_rt::task::spawn_blocking(move || { let iterator = iterator.into_iter(); diff --git a/src/tmp_file.rs b/src/tmp_file.rs index 8c99d41..1335abc 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -13,7 +13,8 @@ struct TmpFile(PathBuf); impl Drop for TmpFile { fn drop(&mut self) { - actix_rt::spawn(tokio::fs::remove_file(self.0.clone())); + tracing::trace_span!(parent: None, "Spawn task") + .in_scope(|| actix_rt::spawn(tokio::fs::remove_file(self.0.clone()))); } }