From 31c5a36c77cdf364815a7b612533664faebc50dd Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 8 Apr 2022 12:51:33 -0500 Subject: [PATCH] Better instrument drops, jobs. Properly disarm backgrounded downloads --- src/backgrounded.rs | 26 ++++++++++++++++++------ src/ingest.rs | 49 +++++++++++++++++++++++++++++++++------------ src/main.rs | 2 ++ src/queue.rs | 2 +- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index d2bfd3a..73eb5e1 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -6,6 +6,7 @@ use crate::{ use actix_web::web::Bytes; use futures_util::{Stream, TryStreamExt}; use tokio_util::io::StreamReader; +use tracing::{Instrument, Span}; pub(crate) struct Backgrounded where @@ -72,24 +73,37 @@ where R: FullRepo + 'static, S: Store, { + #[tracing::instrument(name = "Drop Backgrounded", skip(self), fields(identifier = ?self.identifier, upload_id = ?self.upload_id))] fn drop(&mut self) { if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); + let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Identifier", identifier = ?identifier); + cleanup_span.follows_from(Span::current()); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; - }) + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + } + .instrument(cleanup_span), + ) }); } if let Some(upload_id) = self.upload_id { let repo = self.repo.clone(); + let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); + cleanup_span.follows_from(Span::current()); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - let _ = repo.claim(upload_id).await; - }) + actix_rt::spawn( + async move { + let _ = repo.claim(upload_id).await; + } + .instrument(cleanup_span), + ) }); } } diff --git a/src/ingest.rs b/src/ingest.rs index 7419683..f4c49c7 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -8,6 +8,7 @@ use crate::{ use actix_web::web::{Bytes, BytesMut}; use futures_util::{Stream, StreamExt}; use sha2::{Digest, Sha256}; +use tracing::{Instrument, Span}; mod hasher; use hasher::Hasher; @@ -219,40 +220,62 @@ where R: FullRepo + 'static, S: Store, { + #[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))] fn drop(&mut self) { if let Some(hash) = self.hash.take() { let repo = self.repo.clone(); + + let cleanup_span = + tracing::info_span!(parent: None, "Session cleanup hash", hash = ?hash); + cleanup_span.follows_from(Span::current()); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; - }) + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + } + .instrument(cleanup_span), + ) }); } if let Some(alias) = self.alias.take() { let repo = self.repo.clone(); + let cleanup_span = + tracing::info_span!(parent: None, "Session cleanup alias", alias = ?alias); + cleanup_span.follows_from(Span::current()); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - if let Ok(token) = repo.delete_token(&alias).await { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } else { - let token = DeleteToken::generate(); - if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + actix_rt::spawn( + async move { + if let Ok(token) = repo.delete_token(&alias).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } else { + let token = DeleteToken::generate(); + if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } } } - }) + .instrument(cleanup_span), + ) }); } if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); + let cleanup_span = tracing::info_span!(parent: None, "Session cleanup identifier", identifier = ?identifier); + cleanup_span.follows_from(Span::current()); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; - }) + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + } + .instrument(cleanup_span), + ) }); } } diff --git a/src/main.rs b/src/main.rs index 5e12dc4..939c54e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -322,6 +322,8 @@ async fn do_download_backgrounded( queue::queue_ingest(&**repo, identifier, upload_id, None, true, is_cached).await?; + backgrounded.disarm(); + Ok(HttpResponse::Accepted().json(&serde_json::json!({ "msg": "ok", "uploads": [{ diff --git a/src/queue.rs b/src/queue.rs index aeade42..a5c5fe6 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -170,7 +170,7 @@ where loop { let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; - let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + let span = tracing::info_span!("Running Job", worker_id = ?worker_id, job = ?String::from_utf8_lossy(bytes.as_ref())); span.in_scope(|| (callback)(repo, store, bytes.as_ref())) .instrument(span)