From 3a7d5b7bfb6d5249a16a1e597b846279e103fb3c Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:10:34 -0600 Subject: [PATCH] Re-use try_from_stream where possible --- src/bytes_stream.rs | 2 ++ src/ingest.rs | 29 ++++++----------------------- src/store.rs | 18 ++++-------------- 3 files changed, 12 insertions(+), 37 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 3594be5..cc5e1e1 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -26,6 +26,7 @@ impl BytesStream { } } + #[tracing::instrument(skip(stream))] pub(crate) async fn try_from_stream(stream: S) -> Result where S: Stream>, @@ -35,6 +36,7 @@ impl BytesStream { let mut bs = Self::new(); while let Some(bytes) = stream.try_next().await? { + tracing::trace!("try_from_stream: looping"); bs.add_bytes(bytes); } diff --git a/src/ingest.rs b/src/ingest.rs index b3ddd16..0698ea8 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -14,7 +14,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use reqwest::Body; -use streem::IntoStreamer; use tracing::{Instrument, Span}; mod hasher; @@ -29,25 +28,6 @@ pub(crate) struct Session { identifier: Option>, } -#[tracing::instrument(skip(stream))] -async fn aggregate(stream: S) -> Result -where - S: Stream>, -{ - let mut buf = BytesStream::new(); - - let stream = std::pin::pin!(stream); - let mut stream = stream.into_streamer(); - - while let Some(res) = stream.next().await { - tracing::trace!("aggregate: looping"); - - buf.add_bytes(res?); - } - - Ok(buf) -} - async fn process_ingest( state: &State, stream: impl Stream> + 'static, @@ -63,9 +43,12 @@ async fn process_ingest( where S: Store, { - let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream)) - .await - .map_err(|_| UploadError::AggregateTimeout)??; + let bytes = tokio::time::timeout( + Duration::from_secs(60), + BytesStream::try_from_stream(stream), + ) + .await + .map_err(|_| UploadError::AggregateTimeout)??; let permit = crate::process_semaphore().acquire().await?; diff --git a/src/store.rs b/src/store.rs index 2006713..9d7fcaa 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; -use streem::IntoStreamer; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; @@ -123,20 +122,11 @@ pub(crate) trait Store: Clone + Debug { from_start: Option, len: Option, ) -> Result { - let mut buf = BytesStream::new(); + let stream = self.to_stream(identifier, from_start, len).await?; - let mut streamer = self - .to_stream(identifier, from_start, len) - .await? - .into_streamer(); - - while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? { - tracing::trace!("to_bytes: looping"); - - buf.add_bytes(bytes); - } - - Ok(buf) + BytesStream::try_from_stream(stream) + .await + .map_err(StoreError::ReadStream) } async fn read_into(