2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Re-use try_from_stream where possible

This commit is contained in:
asonix 2024-02-22 16:10:34 -06:00
parent 0ebee2a07c
commit 3a7d5b7bfb
3 changed files with 12 additions and 37 deletions

View file

@ -26,6 +26,7 @@ impl BytesStream {
} }
} }
#[tracing::instrument(skip(stream))]
pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E> pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E>
where where
S: Stream<Item = Result<Bytes, E>>, S: Stream<Item = Result<Bytes, E>>,
@ -35,6 +36,7 @@ impl BytesStream {
let mut bs = Self::new(); let mut bs = Self::new();
while let Some(bytes) = stream.try_next().await? { while let Some(bytes) = stream.try_next().await? {
tracing::trace!("try_from_stream: looping");
bs.add_bytes(bytes); bs.add_bytes(bytes);
} }

View file

@ -14,7 +14,6 @@ use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use reqwest::Body; use reqwest::Body;
use streem::IntoStreamer;
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
mod hasher; mod hasher;
@ -29,25 +28,6 @@ pub(crate) struct Session {
identifier: Option<Arc<str>>, identifier: Option<Arc<str>>,
} }
#[tracing::instrument(skip(stream))]
async fn aggregate<S>(stream: S) -> Result<BytesStream, Error>
where
S: Stream<Item = Result<Bytes, Error>>,
{
let mut buf = BytesStream::new();
let stream = std::pin::pin!(stream);
let mut stream = stream.into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("aggregate: looping");
buf.add_bytes(res?);
}
Ok(buf)
}
async fn process_ingest<S>( async fn process_ingest<S>(
state: &State<S>, state: &State<S>,
stream: impl Stream<Item = Result<Bytes, Error>> + 'static, stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
@ -63,7 +43,10 @@ async fn process_ingest<S>(
where where
S: Store, S: Store,
{ {
let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream)) let bytes = tokio::time::timeout(
Duration::from_secs(60),
BytesStream::try_from_stream(stream),
)
.await .await
.map_err(|_| UploadError::AggregateTimeout)??; .map_err(|_| UploadError::AggregateTimeout)??;

View file

@ -1,7 +1,6 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream};
@ -123,20 +122,11 @@ pub(crate) trait Store: Clone + Debug {
from_start: Option<u64>, from_start: Option<u64>,
len: Option<u64>, len: Option<u64>,
) -> Result<BytesStream, StoreError> { ) -> Result<BytesStream, StoreError> {
let mut buf = BytesStream::new(); let stream = self.to_stream(identifier, from_start, len).await?;
let mut streamer = self BytesStream::try_from_stream(stream)
.to_stream(identifier, from_start, len) .await
.await? .map_err(StoreError::ReadStream)
.into_streamer();
while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? {
tracing::trace!("to_bytes: looping");
buf.add_bytes(bytes);
}
Ok(buf)
} }
async fn read_into<Writer>( async fn read_into<Writer>(