2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-27 04:51:24 +00:00
pict-rs/src/process.rs

282 lines
8.2 KiB
Rust
Raw Normal View History

2021-12-07 02:35:22 +00:00
use crate::store::Store;
2021-09-14 01:22:42 +00:00
use actix_rt::task::JoinHandle;
2021-10-14 00:06:53 +00:00
use actix_web::web::Bytes;
use std::{
future::Future,
pin::Pin,
2023-07-10 20:29:41 +00:00
process::{ExitStatus, Stdio},
task::{Context, Poll},
2023-08-05 16:51:52 +00:00
time::Duration,
};
2021-09-14 01:22:42 +00:00
use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf},
2023-08-05 16:51:52 +00:00
process::{Child, ChildStdin, ChildStdout, Command},
2021-09-14 01:22:42 +00:00
sync::oneshot::{channel, Receiver},
};
use tracing::{Instrument, Span};
#[derive(Debug)]
struct StatusError(ExitStatus);
pub(crate) struct Process {
2021-09-14 01:22:42 +00:00
child: Child,
2023-08-05 16:51:52 +00:00
timeout: Duration,
2022-04-07 02:40:49 +00:00
}
impl std::fmt::Debug for Process {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Process").field("child", &"Child").finish()
}
}
2021-10-20 23:58:32 +00:00
struct DropHandle {
inner: JoinHandle<()>,
}
2023-08-05 16:51:52 +00:00
pub(crate) struct ProcessRead<I> {
inner: I,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
#[allow(dead_code)]
handle: DropHandle,
eof: bool,
sleep: Pin<Box<actix_rt::time::Sleep>>,
}
2023-07-10 20:29:41 +00:00
#[derive(Debug, thiserror::Error)]
pub(crate) enum ProcessError {
2023-07-15 19:41:33 +00:00
#[error("Required command {0} not found, make sure it exists in pict-rs' $PATH")]
2023-07-10 20:29:41 +00:00
NotFound(String),
2023-07-15 19:41:33 +00:00
#[error("Cannot run command {0} due to invalid permissions on binary, make sure the pict-rs user has permission to run it")]
PermissionDenied(String),
2023-07-10 20:29:41 +00:00
#[error("Reached process spawn limit")]
LimitReached,
2023-08-05 16:51:52 +00:00
#[error("Process timed out")]
Timeout,
2023-07-10 20:29:41 +00:00
#[error("Failed with status {0}")]
Status(ExitStatus),
#[error("Unknown process error")]
Other(#[source] std::io::Error),
}
impl Process {
2023-08-05 16:51:52 +00:00
pub(crate) fn run(command: &str, args: &[&str], timeout: u64) -> Result<Self, ProcessError> {
2023-07-10 20:29:41 +00:00
let res = tracing::trace_span!(parent: None, "Create command")
2023-08-05 16:51:52 +00:00
.in_scope(|| Self::spawn(Command::new(command).args(args), timeout));
2023-07-10 20:29:41 +00:00
match res {
Ok(this) => Ok(this),
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command.to_string())),
2023-07-15 19:41:33 +00:00
std::io::ErrorKind::PermissionDenied => {
Err(ProcessError::PermissionDenied(command.to_string()))
}
2023-07-10 20:29:41 +00:00
std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached),
_ => Err(ProcessError::Other(e)),
},
}
2021-09-14 01:22:42 +00:00
}
2023-08-05 16:51:52 +00:00
fn spawn(cmd: &mut Command, timeout: u64) -> std::io::Result<Self> {
let timeout = Duration::from_secs(timeout);
2022-04-07 17:56:40 +00:00
tracing::trace_span!(parent: None, "Spawn command").in_scope(|| {
2023-07-10 20:29:41 +00:00
let cmd = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true);
2023-08-05 16:51:52 +00:00
cmd.spawn().map(|child| Process { child, timeout })
2022-04-07 17:56:40 +00:00
})
}
#[tracing::instrument(skip(self))]
2023-08-05 16:51:52 +00:00
pub(crate) async fn wait(self) -> Result<(), ProcessError> {
let Process { mut child, timeout } = self;
match actix_rt::time::timeout(timeout, child.wait()).await {
Ok(Ok(status)) if status.success() => Ok(()),
Ok(Ok(status)) => Err(ProcessError::Status(status)),
Ok(Err(e)) => Err(ProcessError::Other(e)),
Err(_) => {
child.kill().await.map_err(ProcessError::Other)?;
2023-08-05 16:58:46 +00:00
Err(ProcessError::Timeout)
2023-08-05 16:51:52 +00:00
}
2021-10-23 19:14:12 +00:00
}
}
2023-08-05 16:51:52 +00:00
pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| {
2022-09-25 22:35:52 +00:00
let mut input = input;
async move { stdin.write_all_buf(&mut input).await }
})
}
2023-08-05 16:51:52 +00:00
pub(crate) fn read(self) -> ProcessRead<ChildStdout> {
self.spawn_fn(|_| async { Ok(()) })
}
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
2022-09-25 22:35:52 +00:00
self,
mut async_read: A,
2023-08-05 16:51:52 +00:00
) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| async move {
tokio::io::copy(&mut async_read, &mut stdin)
.await
.map(|_| ())
})
2021-10-23 19:14:12 +00:00
}
pub(crate) fn store_read<S: Store + 'static>(
2022-09-25 22:35:52 +00:00
self,
2021-10-23 04:48:56 +00:00
store: S,
identifier: S::Identifier,
2023-08-05 16:51:52 +00:00
) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| {
2022-09-25 22:35:52 +00:00
let store = store;
let identifier = identifier;
async move { store.read_into(&identifier, &mut stdin).await }
})
}
#[allow(unknown_lints)]
#[allow(clippy::let_with_type_underscore)]
#[tracing::instrument(level = "trace", skip_all)]
2023-08-05 16:51:52 +00:00
fn spawn_fn<F, Fut>(self, f: F) -> ProcessRead<ChildStdout>
2022-09-25 22:35:52 +00:00
where
F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>,
{
2023-08-05 16:51:52 +00:00
let Process { mut child, timeout } = self;
let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists");
2022-09-25 22:35:52 +00:00
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>);
2022-09-25 22:35:52 +00:00
let span = tracing::info_span!(parent: None, "Background process task");
span.follows_from(Span::current());
2022-04-07 17:56:40 +00:00
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
2023-08-05 16:51:52 +00:00
let child_fut = async {
(f)(stdin).await?;
2023-08-05 16:51:52 +00:00
child.wait().await
};
let err = match actix_rt::time::timeout(timeout, child_fut).await {
Ok(Ok(status)) if status.success() => return,
Ok(Ok(status)) => std::io::Error::new(
std::io::ErrorKind::Other,
ProcessError::Status(status),
),
Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(),
};
let _ = tx.send(err);
let _ = child.kill().await;
2022-04-07 17:56:40 +00:00
}
.instrument(span),
)
2022-04-07 02:40:49 +00:00
});
2023-08-05 16:51:52 +00:00
let sleep = Box::pin(actix_rt::time::sleep(timeout));
2022-04-07 02:40:49 +00:00
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
2021-10-20 23:58:32 +00:00
handle: DropHandle { inner: handle },
eof: false,
2023-08-05 16:51:52 +00:00
sleep,
2022-04-07 02:40:49 +00:00
}
}
}
impl<I> AsyncRead for ProcessRead<I>
where
2023-08-05 16:51:52 +00:00
I: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
2023-08-05 16:51:52 +00:00
if !self.err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) {
self.err_closed = true;
2022-04-07 02:40:49 +00:00
if let Ok(err) = res {
return Poll::Ready(Err(err));
}
2023-08-05 16:51:52 +00:00
if self.eof {
return Poll::Ready(Ok(()));
}
}
2023-08-05 16:51:52 +00:00
if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) {
self.err_closed = true;
return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into()));
}
2022-04-07 02:40:49 +00:00
}
2023-08-05 16:51:52 +00:00
if !self.eof {
let before_size = buf.filled().len();
2023-08-05 16:51:52 +00:00
return match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
if buf.filled().len() == before_size {
2023-08-05 16:51:52 +00:00
self.eof = true;
2023-08-05 16:51:52 +00:00
if !self.err_closed {
// reached end of stream & haven't received process signal
return Poll::Pending;
}
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
2023-08-05 16:51:52 +00:00
self.eof = true;
Poll::Ready(Err(e))
}
Poll::Pending => Poll::Pending,
};
}
2023-08-05 16:51:52 +00:00
if self.err_closed && self.eof {
return Poll::Ready(Ok(()));
}
Poll::Pending
}
}
2021-10-20 23:58:32 +00:00
impl Drop for DropHandle {
fn drop(&mut self) {
2021-10-20 23:58:32 +00:00
self.inner.abort();
}
}
impl std::fmt::Display for StatusError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Command failed with bad status: {}", self.0)
}
}
impl std::error::Error for StatusError {}