use actix_rt::{task::JoinHandle, time::Sleep}; use actix_web::web::Bytes; use flume::r#async::RecvStream; use futures_core::Stream; use std::{ future::Future, marker::PhantomData, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, task::{Context, Poll, Wake, Waker}, time::Duration, }; pub(crate) trait MakeSend: Stream> where T: 'static, { fn make_send(self) -> MakeSendStream where Self: Sized + 'static, { let (tx, rx) = crate::sync::channel(4); MakeSendStream { handle: crate::sync::spawn(async move { let this = std::pin::pin!(self); let mut stream = this.into_streamer(); while let Some(res) = stream.next().await { if tx.send_async(res).await.is_err() { return; } } }), rx: rx.into_stream(), } } } impl MakeSend for S where S: Stream>, T: 'static, { } pub(crate) struct MakeSendStream where T: 'static, { handle: actix_rt::task::JoinHandle<()>, rx: flume::r#async::RecvStream<'static, std::io::Result>, } impl Stream for MakeSendStream where T: 'static, { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.rx).poll_next(cx) { Poll::Ready(opt) => Poll::Ready(opt), Poll::Pending if std::task::ready!(Pin::new(&mut self.handle).poll(cx)).is_err() => { Poll::Ready(Some(Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "Stream panicked", )))) } Poll::Pending => Poll::Pending, } } } pin_project_lite::pin_project! { pub(crate) struct Map { #[pin] stream: S, func: F, } } pub(crate) trait StreamMap: Stream { fn map(self, func: F) -> Map where F: FnMut(Self::Item) -> U, Self: Sized, { Map { stream: self, func } } } impl StreamMap for T where T: Stream {} impl Stream for Map where S: Stream, F: FnMut(S::Item) -> U, { type Item = U; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let value = std::task::ready!(this.stream.poll_next(cx)); Poll::Ready(value.map(this.func)) } } pub(crate) struct Empty(PhantomData); impl Stream for Empty { type Item = T; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(None) } } pub(crate) fn empty() -> Empty { Empty(PhantomData) } pub(crate) struct Once(Option); impl Stream for Once where T: Unpin, { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.0.take()) } } pub(crate) fn once(value: T) -> Once { Once(Some(value)) } pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; pub(crate) trait StreamLimit { fn limit(self, limit: u64) -> Limit where Self: Sized, { Limit { inner: self, count: 0, limit, } } } pub(crate) trait StreamTimeout { fn timeout(self, duration: Duration) -> Timeout where Self: Sized, { Timeout { sleep: actix_rt::time::sleep(duration), inner: self, expired: false, woken: Arc::new(AtomicBool::new(true)), } } } pub(crate) trait IntoStreamer: Stream { fn into_streamer(self) -> Streamer where Self: Sized, { Streamer(Some(self)) } } impl IntoStreamer for T where T: Stream + Unpin {} pub(crate) fn from_iterator( iterator: I, buffer: usize, ) -> IterStream { IterStream { state: IterStreamState::New { iterator, buffer }, } } impl StreamLimit for S where S: Stream> {} impl StreamTimeout for S where S: Stream {} pub(crate) struct Streamer(Option); impl Streamer { pub(crate) async fn next(&mut self) -> Option where S: Stream + Unpin, { let stream = self.0.as_mut().take()?; let opt = std::future::poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await; if opt.is_none() { self.0.take(); } opt } } pin_project_lite::pin_project! { pub(crate) struct Limit { #[pin] inner: S, count: u64, limit: u64, } } pin_project_lite::pin_project! { pub(crate) struct Timeout { #[pin] sleep: Sleep, #[pin] inner: S, expired: bool, woken: Arc, } } enum IterStreamState where T: 'static, { New { iterator: I, buffer: usize, }, Running { handle: JoinHandle<()>, receiver: RecvStream<'static, T>, }, Pending, } pub(crate) struct IterStream where T: 'static, { state: IterStreamState, } struct TimeoutWaker { woken: Arc, inner: Waker, } #[derive(Debug, thiserror::Error)] #[error("Resonse body larger than size limit")] pub(crate) struct LimitError; #[derive(Debug, thiserror::Error)] #[error("Timeout in body")] pub(crate) struct TimeoutError; impl Stream for Limit where S: Stream>, E: From, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.as_mut().project(); let limit = this.limit; let count = this.count; let inner = this.inner; inner.poll_next(cx).map(|opt| { opt.map(|res| match res { Ok(bytes) => { *count += bytes.len() as u64; if *count > *limit { return Err(LimitError.into()); } Ok(bytes) } Err(e) => Err(e), }) }) } } impl Wake for TimeoutWaker { fn wake(self: Arc) { self.wake_by_ref() } fn wake_by_ref(self: &Arc) { self.woken.store(true, Ordering::Release); self.inner.wake_by_ref(); } } impl Stream for Timeout where S: Stream, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.as_mut().project(); if *this.expired { return Poll::Ready(None); } if this.woken.swap(false, Ordering::Acquire) { let timeout_waker = Arc::new(TimeoutWaker { woken: Arc::clone(this.woken), inner: cx.waker().clone(), }) .into(); let mut timeout_cx = Context::from_waker(&timeout_waker); if this.sleep.poll(&mut timeout_cx).is_ready() { *this.expired = true; return Poll::Ready(Some(Err(TimeoutError))); } } this.inner.poll_next(cx).map(|opt| opt.map(Ok)) } } impl Stream for IterStream where I: IntoIterator + Send + Unpin + 'static, T: Send + 'static, { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.as_mut().get_mut(); match std::mem::replace(&mut this.state, IterStreamState::Pending) { IterStreamState::New { iterator, buffer } => { let (sender, receiver) = crate::sync::channel(buffer); let mut handle = crate::sync::spawn_blocking(move || { let iterator = iterator.into_iter(); for item in iterator { if sender.send(item).is_err() { break; } } }); if Pin::new(&mut handle).poll(cx).is_ready() { return Poll::Ready(None); } this.state = IterStreamState::Running { handle, receiver: receiver.into_stream(), }; self.poll_next(cx) } IterStreamState::Running { mut handle, mut receiver, } => match Pin::new(&mut receiver).poll_next(cx) { Poll::Ready(Some(item)) => { this.state = IterStreamState::Running { handle, receiver }; Poll::Ready(Some(item)) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => { if Pin::new(&mut handle).poll(cx).is_ready() { return Poll::Ready(None); } this.state = IterStreamState::Running { handle, receiver }; Poll::Pending } }, IterStreamState::Pending => panic!("Polled after completion"), } } }