Defensive tracing

This commit is contained in:
Aode (lion) 2022-04-07 12:56:40 -05:00
parent c80d207a87
commit e493e90dd4
10 changed files with 111 additions and 74 deletions

View File

@ -76,16 +76,20 @@ where
if let Some(identifier) = self.identifier.take() { if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone(); let repo = self.repo.clone();
actix_rt::spawn(async move { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await; actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
})
}); });
} }
if let Some(upload_id) = self.upload_id { if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone(); let repo = self.repo.clone();
actix_rt::spawn(async move { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
let _ = repo.claim(upload_id).await; actix_rt::spawn(async move {
let _ = repo.claim(upload_id).await;
})
}); });
} }
} }

View File

@ -58,7 +58,9 @@ where
(None, span) (None, span)
} }
Entry::Occupied(mut occupied) => { Entry::Occupied(mut occupied) => {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(tokio::sync::oneshot::channel);
occupied.get_mut().push(tx); occupied.get_mut().push(tx);
let span = tracing::info_span!( let span = tracing::info_span!(
"Waiting for processed image", "Waiting for processed image",

View File

@ -20,13 +20,17 @@ mod tokio_file {
impl File { impl File {
pub(crate) async fn open(path: impl AsRef<Path>) -> std::io::Result<Self> { pub(crate) async fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
Ok(File { Ok(File {
inner: tokio::fs::File::open(path).await?, inner: tracing::trace_span!(parent: None, "Open File")
.in_scope(|| tokio::fs::File::open(path))
.await?,
}) })
} }
pub(crate) async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> { pub(crate) async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> {
Ok(File { Ok(File {
inner: tokio::fs::File::create(path).await?, inner: tracing::trace_span!(parent: None, "Create File")
.in_scope(|| tokio::fs::File::create(path))
.await?,
}) })
} }
@ -125,7 +129,9 @@ mod io_uring {
tracing::info!("Opening io-uring file: {:?}", path.as_ref()); tracing::info!("Opening io-uring file: {:?}", path.as_ref());
Ok(File { Ok(File {
path: path.as_ref().to_owned(), path: path.as_ref().to_owned(),
inner: tokio_uring::fs::File::open(path).await?, inner: tracing::trace_span!(parent: None, "Open File")
.in_scope(|| tokio_uring::fs::File::open(path))
.await?,
}) })
} }
@ -133,7 +139,9 @@ mod io_uring {
tracing::info!("Creating io-uring file: {:?}", path.as_ref()); tracing::info!("Creating io-uring file: {:?}", path.as_ref());
Ok(File { Ok(File {
path: path.as_ref().to_owned(), path: path.as_ref().to_owned(),
inner: tokio_uring::fs::File::create(path).await?, inner: tracing::trace_span!(parent: None, "Create File")
.in_scope(|| tokio_uring::fs::File::create(path))
.await?,
}) })
} }

View File

@ -10,6 +10,7 @@ use crate::{
use actix_web::web::Bytes; use actix_web::web::Bytes;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tracing::Instrument;
#[tracing::instrument(skip(hash))] #[tracing::instrument(skip(hash))]
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>( pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
@ -47,9 +48,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let permit = tracing::trace_span!(parent: None, "Aquire semaphore") let permit = crate::PROCESS_SEMAPHORE.acquire().await;
.in_scope(|| crate::PROCESS_SEMAPHORE.acquire())
.await;
let identifier = if let Some(identifier) = repo let identifier = if let Some(identifier) = repo
.still_identifier_from_alias::<S::Identifier>(&alias) .still_identifier_from_alias::<S::Identifier>(&alias)
@ -77,7 +76,10 @@ async fn process<R: FullRepo, S: Store + 'static>(
crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?;
let mut vec = Vec::new(); let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?; processed_reader
.read_to_end(&mut vec)
.instrument(tracing::info_span!("Reading processed image to vec"))
.await?;
let bytes = Bytes::from(vec); let bytes = Bytes::from(vec);
drop(permit); drop(permit);

View File

@ -62,9 +62,7 @@ where
R: FullRepo + 'static, R: FullRepo + 'static,
S: Store, S: Store,
{ {
let permit = tracing::trace_span!(parent: None, "Aquire semaphore") let permit = crate::PROCESS_SEMAPHORE.acquire().await;
.in_scope(|| crate::PROCESS_SEMAPHORE.acquire())
.await;
let bytes = aggregate(stream).await?; let bytes = aggregate(stream).await?;
@ -205,31 +203,37 @@ where
fn drop(&mut self) { fn drop(&mut self) {
if let Some(hash) = self.hash.take() { if let Some(hash) = self.hash.take() {
let repo = self.repo.clone(); let repo = self.repo.clone();
actix_rt::spawn(async move { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; actix_rt::spawn(async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
})
}); });
} }
if let Some(alias) = self.alias.take() { if let Some(alias) = self.alias.take() {
let repo = self.repo.clone(); let repo = self.repo.clone();
actix_rt::spawn(async move { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
if let Ok(token) = repo.delete_token(&alias).await { actix_rt::spawn(async move {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await; if let Ok(token) = repo.delete_token(&alias).await {
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await; let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
} }
} })
}); });
} }
if let Some(identifier) = self.identifier.take() { if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone(); let repo = self.repo.clone();
actix_rt::spawn(async move { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await; actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
})
}); });
} }
} }

View File

@ -70,7 +70,8 @@ mod test {
actix_rt::System::new().block_on(async move { actix_rt::System::new().block_on(async move {
let arbiter = actix_rt::Arbiter::new(); let arbiter = actix_rt::Arbiter::new();
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(|| tokio::sync::oneshot::channel());
arbiter.spawn(async move { arbiter.spawn(async move {
let handle = actix_rt::spawn($fut); let handle = actix_rt::spawn($fut);

View File

@ -75,8 +75,10 @@ static DO_CONFIG: Lazy<(Configuration, Operation)> =
Lazy::new(|| config::configure().expect("Failed to configure")); Lazy::new(|| config::configure().expect("Failed to configure"));
static CONFIG: Lazy<Configuration> = Lazy::new(|| DO_CONFIG.0.clone()); static CONFIG: Lazy<Configuration> = Lazy::new(|| DO_CONFIG.0.clone());
static OPERATION: Lazy<Operation> = Lazy::new(|| DO_CONFIG.1.clone()); static OPERATION: Lazy<Operation> = Lazy::new(|| DO_CONFIG.1.clone());
static PROCESS_SEMAPHORE: Lazy<Semaphore> = static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); tracing::trace_span!(parent: None, "Initialize semaphore")
.in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
});
/// Handle responding to succesful uploads /// Handle responding to succesful uploads
#[instrument(name = "Uploaded files", skip(value))] #[instrument(name = "Uploaded files", skip(value))]
@ -790,16 +792,20 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
let store = store.clone(); let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
actix_rt::spawn(queue::process_cleanup( tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
repo.clone(), actix_rt::spawn(queue::process_cleanup(
store.clone(), repo.clone(),
next_worker_id(), store.clone(),
)); next_worker_id(),
actix_rt::spawn(queue::process_images( ))
repo.clone(), });
store.clone(), tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
next_worker_id(), actix_rt::spawn(queue::process_images(
)); repo.clone(),
store.clone(),
next_worker_id(),
))
});
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())

View File

@ -43,14 +43,17 @@ pin_project_lite::pin_project! {
impl Process { impl Process {
#[tracing::instrument] #[tracing::instrument]
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> { pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
Self::spawn(Command::new(command).args(args)) tracing::trace_span!(parent: None, "Create command")
.in_scope(|| Self::spawn(Command::new(command).args(args)))
} }
#[tracing::instrument] #[tracing::instrument]
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> { pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); tracing::trace_span!(parent: None, "Spawn command").in_scope(|| {
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
cmd.spawn().map(|child| Process { child }) cmd.spawn().map(|child| Process { child })
})
} }
#[tracing::instrument] #[tracing::instrument]
@ -67,10 +70,11 @@ impl Process {
let mut stdin = self.child.stdin.take().expect("stdin exists"); let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists"); let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel::<std::io::Error>(); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>);
let mut child = self.child; let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move { actix_rt::spawn(async move {
if let Err(e) = stdin.write_all_buf(&mut input).await { if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e); let _ = tx.send(e);
@ -104,21 +108,23 @@ impl Process {
pub(crate) fn read(mut self) -> impl AsyncRead + Unpin { pub(crate) fn read(mut self) -> impl AsyncRead + Unpin {
let stdout = self.child.stdout.take().expect("stdout exists"); let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel(); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let mut child = self.child; let mut child = self.child;
let handle = actix_rt::spawn(async move { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
match child.wait().await { actix_rt::spawn(async move {
Ok(status) => { match child.wait().await {
if !status.success() { Ok(status) => {
let _ = if !status.success() {
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
} }
} }
Err(e) => { })
let _ = tx.send(e);
}
}
}); });
ProcessRead { ProcessRead {
@ -138,27 +144,29 @@ impl Process {
let mut stdin = self.child.stdin.take().expect("stdin exists"); let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists"); let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel(); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let mut child = self.child; let mut child = self.child;
let handle = actix_rt::spawn(async move { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
if let Err(e) = store.read_into(&identifier, &mut stdin).await { actix_rt::spawn(async move {
let _ = tx.send(e); if let Err(e) = store.read_into(&identifier, &mut stdin).await {
return; let _ = tx.send(e);
} return;
drop(stdin); }
drop(stdin);
match child.wait().await { match child.wait().await {
Ok(status) => { Ok(status) => {
if !status.success() { if !status.success() {
let _ = let _ = tx
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
} }
} }
Err(e) => { })
let _ = tx.send(e);
}
}
}); });
ProcessRead { ProcessRead {

View File

@ -187,7 +187,8 @@ where
match std::mem::replace(&mut this.state, IterStreamState::Pending) { match std::mem::replace(&mut this.state, IterStreamState::Pending) {
IterStreamState::New { iterator, buffer } => { IterStreamState::New { iterator, buffer } => {
let (sender, receiver) = tokio::sync::mpsc::channel(buffer); let (sender, receiver) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(|| tokio::sync::mpsc::channel(buffer));
let mut handle = actix_rt::task::spawn_blocking(move || { let mut handle = actix_rt::task::spawn_blocking(move || {
let iterator = iterator.into_iter(); let iterator = iterator.into_iter();

View File

@ -13,7 +13,8 @@ struct TmpFile(PathBuf);
impl Drop for TmpFile { impl Drop for TmpFile {
fn drop(&mut self) { fn drop(&mut self) {
actix_rt::spawn(tokio::fs::remove_file(self.0.clone())); tracing::trace_span!(parent: None, "Spawn task")
.in_scope(|| actix_rt::spawn(tokio::fs::remove_file(self.0.clone())));
} }
} }