2024-02-22 22:15:32 +00:00
|
|
|
use actix_web::web::Bytes;
|
2023-08-23 16:59:42 +00:00
|
|
|
use futures_core::Stream;
|
2022-10-01 17:06:33 +00:00
|
|
|
use std::{
|
|
|
|
collections::{vec_deque::IntoIter, VecDeque},
|
2023-07-21 21:58:31 +00:00
|
|
|
convert::Infallible,
|
2022-10-01 17:06:33 +00:00
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
2024-02-22 22:02:33 +00:00
|
|
|
use streem::IntoStreamer;
|
|
|
|
use tokio::io::AsyncRead;
|
2024-02-22 23:49:16 +00:00
|
|
|
use tokio_util::bytes::Buf;
|
2022-10-01 17:06:33 +00:00
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub(crate) struct BytesStream {
|
|
|
|
inner: VecDeque<Bytes>,
|
|
|
|
total_len: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl BytesStream {
|
|
|
|
pub(crate) fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
inner: VecDeque::new(),
|
|
|
|
total_len: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-22 22:10:34 +00:00
|
|
|
#[tracing::instrument(skip(stream))]
|
2024-02-22 22:02:33 +00:00
|
|
|
pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E>
|
|
|
|
where
|
|
|
|
S: Stream<Item = Result<Bytes, E>>,
|
|
|
|
{
|
|
|
|
let stream = std::pin::pin!(stream);
|
|
|
|
let mut stream = stream.into_streamer();
|
|
|
|
let mut bs = Self::new();
|
|
|
|
|
|
|
|
while let Some(bytes) = stream.try_next().await? {
|
2024-02-22 22:10:34 +00:00
|
|
|
tracing::trace!("try_from_stream: looping");
|
2024-02-22 22:02:33 +00:00
|
|
|
bs.add_bytes(bytes);
|
|
|
|
}
|
|
|
|
|
2024-02-22 23:49:16 +00:00
|
|
|
tracing::debug!(
|
|
|
|
"BytesStream with {} chunks, avg length {}",
|
|
|
|
bs.chunks_len(),
|
|
|
|
bs.len() / bs.chunks_len()
|
|
|
|
);
|
|
|
|
|
2024-02-22 22:02:33 +00:00
|
|
|
Ok(bs)
|
|
|
|
}
|
|
|
|
|
2024-02-22 23:49:16 +00:00
|
|
|
pub(crate) fn chunks_len(&self) -> usize {
|
|
|
|
self.inner.len()
|
|
|
|
}
|
|
|
|
|
2022-10-01 17:06:33 +00:00
|
|
|
pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
|
|
|
|
self.total_len += bytes.len();
|
|
|
|
self.inner.push_back(bytes);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn len(&self) -> usize {
|
|
|
|
self.total_len
|
|
|
|
}
|
|
|
|
|
2024-02-22 22:02:33 +00:00
|
|
|
pub(crate) fn is_empty(&self) -> bool {
|
2024-02-22 23:09:03 +00:00
|
|
|
self.total_len == 0
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn into_reader(self) -> BytesReader {
|
2024-02-22 23:49:16 +00:00
|
|
|
BytesReader { inner: self.inner }
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn into_io_stream(self) -> IoStream {
|
2024-02-22 22:15:32 +00:00
|
|
|
IoStream { inner: self.inner }
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) struct IoStream {
|
2024-02-22 22:15:32 +00:00
|
|
|
inner: VecDeque<Bytes>,
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) struct BytesReader {
|
|
|
|
inner: VecDeque<Bytes>,
|
2022-10-01 17:06:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl IntoIterator for BytesStream {
|
|
|
|
type Item = Bytes;
|
|
|
|
type IntoIter = IntoIter<Bytes>;
|
|
|
|
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
|
|
self.inner.into_iter()
|
|
|
|
}
|
|
|
|
}
|
2022-10-01 18:00:07 +00:00
|
|
|
|
2024-02-22 22:15:32 +00:00
|
|
|
impl Stream for BytesStream {
|
|
|
|
type Item = Result<Bytes, Infallible>;
|
2022-10-01 18:00:07 +00:00
|
|
|
|
2024-02-22 22:15:32 +00:00
|
|
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
2022-10-01 18:00:07 +00:00
|
|
|
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
|
|
|
}
|
|
|
|
|
2024-02-22 22:15:32 +00:00
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
(self.inner.len(), Some(self.inner.len()))
|
2022-10-01 18:00:07 +00:00
|
|
|
}
|
|
|
|
}
|
2023-07-21 21:58:31 +00:00
|
|
|
|
2024-02-22 22:15:32 +00:00
|
|
|
impl Stream for IoStream {
|
|
|
|
type Item = std::io::Result<Bytes>;
|
2023-07-21 21:58:31 +00:00
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
|
|
|
}
|
2024-02-22 22:02:33 +00:00
|
|
|
|
2024-02-22 22:15:32 +00:00
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
(self.inner.len(), Some(self.inner.len()))
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AsyncRead for BytesReader {
|
|
|
|
fn poll_read(
|
|
|
|
mut self: Pin<&mut Self>,
|
2024-02-22 22:21:31 +00:00
|
|
|
_: &mut Context<'_>,
|
2024-02-22 22:02:33 +00:00
|
|
|
buf: &mut tokio::io::ReadBuf<'_>,
|
|
|
|
) -> Poll<std::io::Result<()>> {
|
|
|
|
while buf.remaining() > 0 {
|
2024-02-22 23:49:16 +00:00
|
|
|
tracing::trace!("bytes reader: looping");
|
|
|
|
|
|
|
|
if let Some(bytes) = self.inner.front_mut() {
|
|
|
|
if bytes.is_empty() {
|
2024-02-22 22:21:31 +00:00
|
|
|
self.inner.pop_front();
|
|
|
|
continue;
|
|
|
|
}
|
2024-02-22 22:02:33 +00:00
|
|
|
|
2024-02-22 23:49:16 +00:00
|
|
|
let upper_bound = buf.remaining().min(bytes.len());
|
2024-02-22 22:02:33 +00:00
|
|
|
|
2024-02-22 23:49:16 +00:00
|
|
|
let slice = &bytes[..upper_bound];
|
2024-02-22 22:02:33 +00:00
|
|
|
|
2024-02-22 22:21:31 +00:00
|
|
|
buf.put_slice(slice);
|
2024-02-22 23:49:16 +00:00
|
|
|
bytes.advance(upper_bound);
|
2024-02-22 22:21:31 +00:00
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
2024-02-22 22:02:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
}
|