2021-12-07 02:35:22 +00:00
|
|
|
use crate::store::Store;
|
2021-09-14 01:22:42 +00:00
|
|
|
use actix_rt::task::JoinHandle;
|
2021-10-14 00:06:53 +00:00
|
|
|
use actix_web::web::Bytes;
|
2021-08-31 02:19:47 +00:00
|
|
|
use std::{
|
2021-09-04 00:53:53 +00:00
|
|
|
future::Future,
|
2021-08-31 02:19:47 +00:00
|
|
|
pin::Pin,
|
2021-09-14 01:22:42 +00:00
|
|
|
process::Stdio,
|
2021-08-31 02:19:47 +00:00
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
2021-09-14 01:22:42 +00:00
|
|
|
use tokio::{
|
|
|
|
io::{AsyncRead, AsyncWriteExt, ReadBuf},
|
|
|
|
process::{Child, Command},
|
|
|
|
sync::oneshot::{channel, Receiver},
|
|
|
|
};
|
2022-04-07 18:28:28 +00:00
|
|
|
use tracing::{Instrument, Span};
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2021-09-09 19:16:12 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct StatusError;
|
|
|
|
|
2021-08-31 02:19:47 +00:00
|
|
|
pub(crate) struct Process {
|
2021-09-14 01:22:42 +00:00
|
|
|
child: Child,
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl std::fmt::Debug for Process {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("Process").field("child", &"Child").finish()
|
|
|
|
}
|
2021-08-31 02:19:47 +00:00
|
|
|
}
|
|
|
|
|
2021-10-20 23:58:32 +00:00
|
|
|
struct DropHandle {
|
|
|
|
inner: JoinHandle<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pin_project_lite::pin_project! {
|
|
|
|
struct ProcessRead<I> {
|
|
|
|
#[pin]
|
|
|
|
inner: I,
|
|
|
|
err_recv: Receiver<std::io::Error>,
|
|
|
|
err_closed: bool,
|
|
|
|
handle: DropHandle,
|
|
|
|
}
|
2021-09-04 00:53:53 +00:00
|
|
|
}
|
|
|
|
|
2021-08-31 02:19:47 +00:00
|
|
|
impl Process {
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument]
|
2021-09-14 01:22:42 +00:00
|
|
|
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
|
2022-04-07 17:56:40 +00:00
|
|
|
tracing::trace_span!(parent: None, "Create command")
|
|
|
|
.in_scope(|| Self::spawn(Command::new(command).args(args)))
|
2021-09-14 01:22:42 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument]
|
2021-09-14 01:22:42 +00:00
|
|
|
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
|
2022-04-07 17:56:40 +00:00
|
|
|
tracing::trace_span!(parent: None, "Spawn command").in_scope(|| {
|
|
|
|
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
|
2021-09-25 20:23:05 +00:00
|
|
|
|
2022-04-07 17:56:40 +00:00
|
|
|
cmd.spawn().map(|child| Process { child })
|
|
|
|
})
|
2021-08-31 02:19:47 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument]
|
2021-10-23 19:14:12 +00:00
|
|
|
pub(crate) async fn wait(mut self) -> std::io::Result<()> {
|
|
|
|
let status = self.child.wait().await?;
|
|
|
|
if !status.success() {
|
|
|
|
return Err(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument(skip(input))]
|
|
|
|
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> impl AsyncRead + Unpin {
|
|
|
|
let mut stdin = self.child.stdin.take().expect("stdin exists");
|
|
|
|
let stdout = self.child.stdout.take().expect("stdout exists");
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 17:56:40 +00:00
|
|
|
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
|
|
|
|
.in_scope(channel::<std::io::Error>);
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 18:28:28 +00:00
|
|
|
let span = tracing::info_span!(parent: None, "Background process task from bytes");
|
|
|
|
span.follows_from(Span::current());
|
|
|
|
|
2021-09-09 19:16:12 +00:00
|
|
|
let mut child = self.child;
|
2022-04-07 17:56:40 +00:00
|
|
|
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
2022-04-07 18:28:28 +00:00
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
if let Err(e) = stdin.write_all_buf(&mut input).await {
|
2021-09-25 20:23:05 +00:00
|
|
|
let _ = tx.send(e);
|
2022-04-07 18:28:28 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
drop(stdin);
|
|
|
|
|
|
|
|
match child.wait().await {
|
|
|
|
Ok(status) => {
|
|
|
|
if !status.success() {
|
|
|
|
let _ = tx.send(std::io::Error::new(
|
|
|
|
std::io::ErrorKind::Other,
|
|
|
|
&StatusError,
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = tx.send(e);
|
|
|
|
}
|
2021-09-25 20:23:05 +00:00
|
|
|
}
|
2021-09-09 19:16:12 +00:00
|
|
|
}
|
2022-04-07 18:28:28 +00:00
|
|
|
.instrument(span),
|
|
|
|
)
|
2022-04-07 02:40:49 +00:00
|
|
|
});
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
ProcessRead {
|
2021-09-04 00:53:53 +00:00
|
|
|
inner: stdout,
|
|
|
|
err_recv: rx,
|
|
|
|
err_closed: false,
|
2021-10-20 23:58:32 +00:00
|
|
|
handle: DropHandle { inner: handle },
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
2021-09-04 00:53:53 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument]
|
|
|
|
pub(crate) fn read(mut self) -> impl AsyncRead + Unpin {
|
|
|
|
let stdout = self.child.stdout.take().expect("stdout exists");
|
2021-10-23 19:14:12 +00:00
|
|
|
|
2022-04-07 17:56:40 +00:00
|
|
|
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
|
2021-10-23 19:14:12 +00:00
|
|
|
|
2022-04-07 18:28:28 +00:00
|
|
|
let span = tracing::info_span!(parent: None, "Background process task");
|
|
|
|
span.follows_from(Span::current());
|
|
|
|
|
2021-10-23 19:14:12 +00:00
|
|
|
let mut child = self.child;
|
2022-04-07 17:56:40 +00:00
|
|
|
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
2022-04-07 18:28:28 +00:00
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
match child.wait().await {
|
|
|
|
Ok(status) => {
|
|
|
|
if !status.success() {
|
|
|
|
let _ = tx.send(std::io::Error::new(
|
|
|
|
std::io::ErrorKind::Other,
|
|
|
|
&StatusError,
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = tx.send(e);
|
2022-04-07 17:56:40 +00:00
|
|
|
}
|
2021-10-23 19:14:12 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-07 18:28:28 +00:00
|
|
|
.instrument(span),
|
|
|
|
)
|
2022-04-07 02:40:49 +00:00
|
|
|
});
|
2021-10-23 19:14:12 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
ProcessRead {
|
2021-10-23 19:14:12 +00:00
|
|
|
inner: stdout,
|
|
|
|
err_recv: rx,
|
|
|
|
err_closed: false,
|
|
|
|
handle: DropHandle { inner: handle },
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
2021-10-23 19:14:12 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[tracing::instrument]
|
2022-04-01 21:51:12 +00:00
|
|
|
pub(crate) fn store_read<S: Store + 'static>(
|
2021-09-04 00:53:53 +00:00
|
|
|
mut self,
|
2021-10-23 04:48:56 +00:00
|
|
|
store: S,
|
|
|
|
identifier: S::Identifier,
|
2022-04-07 02:40:49 +00:00
|
|
|
) -> impl AsyncRead + Unpin {
|
|
|
|
let mut stdin = self.child.stdin.take().expect("stdin exists");
|
|
|
|
let stdout = self.child.stdout.take().expect("stdout exists");
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 17:56:40 +00:00
|
|
|
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 18:28:28 +00:00
|
|
|
let span = tracing::info_span!(
|
|
|
|
parent: None,
|
|
|
|
"Background processs task from store",
|
|
|
|
?store,
|
|
|
|
?identifier
|
|
|
|
);
|
|
|
|
span.follows_from(Span::current());
|
|
|
|
|
2021-09-09 19:16:12 +00:00
|
|
|
let mut child = self.child;
|
2022-04-07 17:56:40 +00:00
|
|
|
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
2022-04-07 18:28:28 +00:00
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
if let Err(e) = store.read_into(&identifier, &mut stdin).await {
|
2022-04-07 17:56:40 +00:00
|
|
|
let _ = tx.send(e);
|
2022-04-07 18:28:28 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
drop(stdin);
|
|
|
|
|
|
|
|
match child.wait().await {
|
|
|
|
Ok(status) => {
|
|
|
|
if !status.success() {
|
|
|
|
let _ = tx.send(std::io::Error::new(
|
|
|
|
std::io::ErrorKind::Other,
|
|
|
|
&StatusError,
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = tx.send(e);
|
|
|
|
}
|
2022-04-07 17:56:40 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-07 18:28:28 +00:00
|
|
|
.instrument(span),
|
|
|
|
)
|
2022-04-07 02:40:49 +00:00
|
|
|
});
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
ProcessRead {
|
2021-09-04 00:53:53 +00:00
|
|
|
inner: stdout,
|
|
|
|
err_recv: rx,
|
|
|
|
err_closed: false,
|
2021-10-20 23:58:32 +00:00
|
|
|
handle: DropHandle { inner: handle },
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
2021-09-04 00:53:53 +00:00
|
|
|
}
|
2021-08-31 02:19:47 +00:00
|
|
|
}
|
|
|
|
|
2021-09-04 00:53:53 +00:00
|
|
|
impl<I> AsyncRead for ProcessRead<I>
|
|
|
|
where
|
2021-10-20 23:58:32 +00:00
|
|
|
I: AsyncRead,
|
2021-09-04 00:53:53 +00:00
|
|
|
{
|
|
|
|
fn poll_read(
|
|
|
|
mut self: Pin<&mut Self>,
|
|
|
|
cx: &mut Context<'_>,
|
|
|
|
buf: &mut ReadBuf<'_>,
|
|
|
|
) -> Poll<std::io::Result<()>> {
|
2021-10-20 23:58:32 +00:00
|
|
|
let this = self.as_mut().project();
|
|
|
|
|
|
|
|
let err_recv = this.err_recv;
|
|
|
|
let err_closed = this.err_closed;
|
|
|
|
let inner = this.inner;
|
2021-12-06 18:43:03 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
if !*err_closed {
|
|
|
|
if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
|
|
|
|
*err_closed = true;
|
|
|
|
if let Ok(err) = res {
|
|
|
|
return Poll::Ready(Err(err));
|
2021-09-04 00:53:53 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
2021-09-04 00:53:53 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
inner.poll_read(cx, buf)
|
2021-09-04 00:53:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-20 23:58:32 +00:00
|
|
|
impl Drop for DropHandle {
|
2021-09-11 21:35:38 +00:00
|
|
|
fn drop(&mut self) {
|
2021-10-20 23:58:32 +00:00
|
|
|
self.inner.abort();
|
2021-09-11 21:35:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:16:12 +00:00
|
|
|
impl std::fmt::Display for StatusError {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
|
|
write!(f, "Command failed with bad status")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::error::Error for StatusError {}
|