Add follows-from relation for background tasks

This commit is contained in:
Aode (lion) 2022-04-07 13:28:28 -05:00
parent e493e90dd4
commit 8781bc8f28
1 changed files with 70 additions and 40 deletions

View File

@ -12,6 +12,7 @@ use tokio::{
process::{Child, Command}, process::{Child, Command},
sync::oneshot::{channel, Receiver}, sync::oneshot::{channel, Receiver},
}; };
use tracing::{Instrument, Span};
#[derive(Debug)] #[derive(Debug)]
struct StatusError; struct StatusError;
@ -73,27 +74,35 @@ impl Process {
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>); .in_scope(channel::<std::io::Error>);
let span = tracing::info_span!(parent: None, "Background process task from bytes");
span.follows_from(Span::current());
let mut child = self.child; let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move { actix_rt::spawn(
if let Err(e) = stdin.write_all_buf(&mut input).await { async move {
let _ = tx.send(e); if let Err(e) = stdin.write_all_buf(&mut input).await {
return; let _ = tx.send(e);
} return;
drop(stdin); }
drop(stdin);
match child.wait().await { match child.wait().await {
Ok(status) => { Ok(status) => {
if !status.success() { if !status.success() {
let _ = tx let _ = tx.send(std::io::Error::new(
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
} }
} }
Err(e) => {
let _ = tx.send(e);
}
} }
}) .instrument(span),
)
}); });
ProcessRead { ProcessRead {
@ -110,21 +119,29 @@ impl Process {
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let span = tracing::info_span!(parent: None, "Background process task");
span.follows_from(Span::current());
let mut child = self.child; let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move { actix_rt::spawn(
match child.wait().await { async move {
Ok(status) => { match child.wait().await {
if !status.success() { Ok(status) => {
let _ = tx if !status.success() {
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
} }
} }
Err(e) => {
let _ = tx.send(e);
}
} }
}) .instrument(span),
)
}); });
ProcessRead { ProcessRead {
@ -146,27 +163,40 @@ impl Process {
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let span = tracing::info_span!(
parent: None,
"Background processs task from store",
?store,
?identifier
);
span.follows_from(Span::current());
let mut child = self.child; let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move { actix_rt::spawn(
if let Err(e) = store.read_into(&identifier, &mut stdin).await { async move {
let _ = tx.send(e); if let Err(e) = store.read_into(&identifier, &mut stdin).await {
return; let _ = tx.send(e);
} return;
drop(stdin); }
drop(stdin);
match child.wait().await { match child.wait().await {
Ok(status) => { Ok(status) => {
if !status.success() { if !status.success() {
let _ = tx let _ = tx.send(std::io::Error::new(
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
} }
} }
Err(e) => {
let _ = tx.send(e);
}
} }
}) .instrument(span),
)
}); });
ProcessRead { ProcessRead {