From a6f2082b37811f5cb2a7d0fd35fcc72122df6585 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Tue, 31 Aug 2021 10:17:08 -0500 Subject: [PATCH] Spawn new task to avoid complexity/bugs --- src/stream.rs | 109 ++++++++++++++------------------------------------ 1 file changed, 30 insertions(+), 79 deletions(-) diff --git a/src/stream.rs b/src/stream.rs index 3240226..5afc86f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,8 +1,5 @@ use actix_web::web::Bytes; -use futures::{ - future::FutureExt, - stream::{LocalBoxStream, Stream, StreamExt}, -}; +use futures::stream::{LocalBoxStream, Stream, StreamExt}; use std::{ pin::Pin, task::{Context, Poll}, @@ -49,98 +46,52 @@ impl Process { self.child.stdout.take().map(ProcessStream::new) } - pub(crate) fn sink_stream(mut self, mut input_stream: S) -> Option> + pub(crate) fn sink_stream(mut self, input_stream: S) -> Option> where S: Stream> + Unpin + 'static, E: From + 'static, { - let mut stdin = self.child.stdin.take(); + let mut stdin = self.take_sink()?; let mut stdout = self.take_stream()?; - let s = async_stream::stream! { - let mut wait = Box::pin(self.child.wait().fuse()); + let (tx, mut rx) = tokio::sync::mpsc::channel(1); - loop { - tokio::select! { - res = input_stream.next() => { - match res { - Some(Ok(mut bytes)) => { - if let Some(stdin) = stdin.as_mut() { - let mut fut = Box::pin(stdin.write_all_buf(&mut bytes)); + actix_rt::spawn(async move { + if let Err(e) = stdin.send(input_stream).await { + let _ = tx.send(e).await; + } + }); - loop { - tokio::select! { - res = &mut fut => { - if let Err(e) = res { - yield Err(e.into()); - } - break; - } - res = stdout.next() => { - match res { - Some(Ok(bytes)) => yield Ok(bytes), - Some(Err(e)) => { - yield Err(e.into()); - break; - } - None => break, - } - } - res = &mut wait => { - match res { - Ok(status) if !status.success() => { - yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()); - break; - }, - Err(e) => { - yield Err(e.into()); - break; - } - _ => (), - } - } - } - } - } - }, - Some(Err(e)) => { + Some(ProcessSinkStream { + stream: Box::pin(async_stream::stream! { + loop { + tokio::select! { + opt = rx.recv() => { + if let Some(e) = opt { yield Err(e); break; } - None => { - stdin.take(); - }, } - } - res = stdout.next() => { - match res { - Some(Ok(bytes)) => yield Ok(bytes), - Some(Err(e)) => { - yield Err(e.into()); - break; + res = stdout.next() => { + match res { + Some(Ok(bytes)) => yield Ok(bytes), + Some(Err(e)) => { + yield Err(e.into()); + break; + } + None => break, } - None => break, - } - } - res = &mut wait => { - match res { - Ok(status) if !status.success() => { - yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()); - break; - }, - Err(e) => { - yield Err(e.into()); - break; - } - _ => (), } } } - } - }; - Some(ProcessSinkStream { - stream: Box::pin(s), + drop(stdout); + match self.child.wait().await { + Ok(status) if status.success() => return, + Ok(_) => yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()), + Err(e) => yield Err(e.into()), + } + }), }) } }