diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index d33a0e7..1e88704 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -8,6 +8,7 @@ use std::{ }; use streem::IntoStreamer; use tokio::io::AsyncRead; +use tokio_util::bytes::Buf; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -37,9 +38,19 @@ impl BytesStream { bs.add_bytes(bytes); } + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + bs.chunks_len(), + bs.len() / bs.chunks_len() + ); + Ok(bs) } + pub(crate) fn chunks_len(&self) -> usize { + self.inner.len() + } + pub(crate) fn add_bytes(&mut self, bytes: Bytes) { self.total_len += bytes.len(); self.inner.push_back(bytes); @@ -54,10 +65,7 @@ impl BytesStream { } pub(crate) fn into_reader(self) -> BytesReader { - BytesReader { - index: 0, - inner: self.inner, - } + BytesReader { inner: self.inner } } pub(crate) fn into_io_stream(self) -> IoStream { @@ -70,7 +78,6 @@ pub(crate) struct IoStream { } pub(crate) struct BytesReader { - index: usize, inner: VecDeque, } @@ -114,19 +121,20 @@ impl AsyncRead for BytesReader { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { while buf.remaining() > 0 { - if let Some(bytes) = self.inner.front() { - if self.index == bytes.len() { + tracing::trace!("bytes reader: looping"); + + if let Some(bytes) = self.inner.front_mut() { + if bytes.is_empty() { self.inner.pop_front(); - self.index = 0; continue; } - let upper_bound = (self.index + buf.remaining()).min(bytes.len()); + let upper_bound = buf.remaining().min(bytes.len()); - let slice = &bytes[self.index..upper_bound]; + let slice = &bytes[..upper_bound]; buf.put_slice(slice); - self.index += slice.len(); + bytes.advance(upper_bound); } else { break; } diff --git a/src/generate.rs b/src/generate.rs index 43f7360..fdb209d 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -127,7 +127,9 @@ async fn process( ) .await? .into_bytes_stream() - .instrument(tracing::info_span!("Reading processed image to vec")) + .instrument(tracing::info_span!( + "Reading processed image to BytesStream" + )) .await?; drop(permit); diff --git a/src/process.rs b/src/process.rs index 1189de2..c512b4f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,3 @@ - use std::{ ffi::OsStr, future::Future, @@ -8,7 +7,7 @@ use std::{ }; use tokio::{ - io::{AsyncReadExt}, + io::AsyncReadExt, process::{Child, ChildStdin, Command}, }; use tokio_util::io::ReaderStream; @@ -323,7 +322,7 @@ impl ProcessRead { let cmd = self.command.clone(); self.with_stdout(move |stdout| { - BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16)) + BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64)) }) .await? .map_err(move |e| ProcessError::Read(cmd, e)) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index b6a82a1..354c1b5 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -186,6 +186,12 @@ where } } + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + buf.chunks_len(), + buf.len() / buf.chunks_len() + ); + Ok(buf) } @@ -225,7 +231,7 @@ impl Store for ObjectStore { where Reader: AsyncRead + Unpin + 'static, { - self.save_stream(ReaderStream::with_capacity(reader, 1024 * 16), content_type) + self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type) .await }