From c9a74a73ca3c07faa3f0b751228a3dd449e3613a Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 25 Sep 2022 17:35:52 -0500 Subject: [PATCH] Reduce redundancy in process module --- src/process.rs | 190 +++++++++++-------------------------------------- 1 file changed, 42 insertions(+), 148 deletions(-) diff --git a/src/process.rs b/src/process.rs index 80aaf76..1a875d7 100644 --- a/src/process.rs +++ b/src/process.rs @@ -4,12 +4,12 @@ use actix_web::web::Bytes; use std::{ future::Future, pin::Pin, - process::Stdio, + process::{Stdio}, task::{Context, Poll}, }; use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, - process::{Child, Command}, + process::{Child, Command, ChildStdin}, sync::oneshot::{channel, Receiver}, }; use tracing::{Instrument, Span}; @@ -67,58 +67,50 @@ impl Process { } #[tracing::instrument(skip(input))] - pub(crate) fn bytes_read(mut self, mut input: Bytes) -> impl AsyncRead + Unpin { - let mut stdin = self.child.stdin.take().expect("stdin exists"); + pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin { + self.read_fn(move |mut stdin| { + let mut input = input; + async move { stdin.write_all_buf(&mut input).await } + }) + } + + #[tracing::instrument] + pub(crate) fn read(self) -> impl AsyncRead + Unpin { + self.read_fn(|_| async { Ok(()) }) + } + + pub(crate) fn pipe_async_read( + self, + mut async_read: A, + ) -> impl AsyncRead + Unpin { + self.read_fn(move |mut stdin| async move { tokio::io::copy(&mut async_read, &mut stdin).await.map(|_| ()) }) + } + + #[tracing::instrument] + pub(crate) fn store_read( + self, + store: S, + identifier: S::Identifier, + ) -> impl AsyncRead + Unpin { + self.read_fn(move |mut stdin| { + let store = store; + let identifier = identifier; + + async move { store.read_into(&identifier, &mut stdin).await } + }) + } + + fn read_fn(mut self, f: F) -> impl AsyncRead + Unpin + where + F: FnOnce(ChildStdin) -> Fut + 'static, + Fut: Future>, + { + let stdin = self.child.stdin.take().expect("stdin exists"); let stdout = self.child.stdout.take().expect("stdout exists"); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") .in_scope(channel::); - let span = tracing::info_span!(parent: None, "Background process task from bytes"); - span.follows_from(Span::current()); - - let mut child = self.child; - let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - if let Err(e) = stdin.write_all_buf(&mut input).await { - let _ = tx.send(e); - return; - } - drop(stdin); - - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx.send(std::io::Error::new( - std::io::ErrorKind::Other, - &StatusError, - )); - } - } - Err(e) => { - let _ = tx.send(e); - } - } - } - .instrument(span), - ) - }); - - ProcessRead { - inner: stdout, - err_recv: rx, - err_closed: false, - handle: DropHandle { inner: handle }, - } - } - - #[tracing::instrument] - pub(crate) fn read(mut self) -> impl AsyncRead + Unpin { - let stdout = self.child.stdout.take().expect("stdout exists"); - - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); - let span = tracing::info_span!(parent: None, "Background process task"); span.follows_from(Span::current()); @@ -126,54 +118,10 @@ impl Process { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx.send(std::io::Error::new( - std::io::ErrorKind::Other, - &StatusError, - )); - } - } - Err(e) => { - let _ = tx.send(e); - } - } - } - .instrument(span), - ) - }); - - ProcessRead { - inner: stdout, - err_recv: rx, - err_closed: false, - handle: DropHandle { inner: handle }, - } - } - - pub(crate) fn pipe_async_read( - mut self, - mut async_read: A, - ) -> impl AsyncRead + Unpin { - let mut stdin = self.child.stdin.take().expect("stdin exists"); - let stdout = self.child.stdout.take().expect("stdout exists"); - - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") - .in_scope(channel::); - - let span = tracing::info_span!(parent: None, "Background process task from bytes"); - span.follows_from(Span::current()); - - let mut child = self.child; - let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - if let Err(e) = tokio::io::copy(&mut async_read, &mut stdin).await { + if let Err(e) = (f)(stdin).await { let _ = tx.send(e); return; } - drop(stdin); match child.wait().await { Ok(status) => { @@ -199,61 +147,7 @@ impl Process { err_closed: false, handle: DropHandle { inner: handle }, } - } - #[tracing::instrument] - pub(crate) fn store_read( - mut self, - store: S, - identifier: S::Identifier, - ) -> impl AsyncRead + Unpin { - let mut stdin = self.child.stdin.take().expect("stdin exists"); - let stdout = self.child.stdout.take().expect("stdout exists"); - - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel); - - let span = tracing::info_span!( - parent: None, - "Background processs task from store", - ?store, - ?identifier - ); - span.follows_from(Span::current()); - - let mut child = self.child; - let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - if let Err(e) = store.read_into(&identifier, &mut stdin).await { - let _ = tx.send(e); - return; - } - drop(stdin); - - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx.send(std::io::Error::new( - std::io::ErrorKind::Other, - &StatusError, - )); - } - } - Err(e) => { - let _ = tx.send(e); - } - } - } - .instrument(span), - ) - }); - - ProcessRead { - inner: stdout, - err_recv: rx, - err_closed: false, - handle: DropHandle { inner: handle }, - } } }