diff --git a/src/main.rs b/src/main.rs index f963e38..8d5549d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use actix_web::{ use awc::Client; use futures_util::{ stream::{empty, once}, - Stream, + Stream, TryStreamExt, }; use once_cell::sync::Lazy; use std::{ @@ -35,7 +35,6 @@ mod ffmpeg; mod file; mod init_tracing; mod magick; -mod map_error; mod middleware; mod migrate; mod process; @@ -207,7 +206,7 @@ async fn download( } let stream = Limit::new( - map_error::map_crate_error(res), + res.map_err(Error::from), (CONFIG.media.max_file_size * MEGABYTES) as u64, ); @@ -495,9 +494,11 @@ async fn ranged_file_resp( builder.insert_header(content_range); ( builder, - Either::left(Either::left(map_error::map_crate_error( - range::chop_store(range, store, &identifier, len).await?, - ))), + Either::left(Either::left( + range::chop_store(range, store, &identifier, len) + .await? + .map_err(Error::from), + )), ) } else { ( @@ -510,7 +511,10 @@ async fn ranged_file_resp( } } else { //No Range header in the request - return the entire document - let stream = map_error::map_crate_error(store.to_stream(&identifier, None, None).await?); + let stream = store + .to_stream(&identifier, None, None) + .await? + .map_err(Error::from); (HttpResponse::Ok(), Either::right(stream)) }; @@ -642,7 +646,7 @@ async fn launch( .session(store) .upload( CONFIG.media.enable_silent_video, - map_error::map_crate_error(stream), + stream.map_err(Error::from), ) .await; @@ -679,7 +683,7 @@ async fn launch( filename, !CONFIG.media.skip_validate_imports, CONFIG.media.enable_silent_video, - map_error::map_crate_error(stream), + stream.map_err(Error::from), ) .await; diff --git a/src/map_error.rs b/src/map_error.rs deleted file mode 100644 index b63b9a8..0000000 --- a/src/map_error.rs +++ /dev/null @@ -1,43 +0,0 @@ -use crate::error::Error; -use futures_util::stream::Stream; -use std::{ - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; - -pin_project_lite::pin_project! { - pub(super) struct MapError { - #[pin] - inner: S, - - _error: PhantomData, - } -} - -pub(super) fn map_crate_error(inner: S) -> MapError { - map_error(inner) -} - -pub(super) fn map_error(inner: S) -> MapError { - MapError { - inner, - _error: PhantomData, - } -} - -impl Stream for MapError -where - S: Stream>, - E: From, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().project(); - - this.inner - .poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(Into::into))) - } -} diff --git a/src/store/object_store.rs b/src/store/object_store.rs index dcd09f1..edddd71 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -5,7 +5,7 @@ use crate::{ }; use actix_rt::time::Sleep; use actix_web::web::Bytes; -use futures_util::stream::Stream; +use futures_util::{stream::Stream, TryStreamExt}; use s3::{ client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region, }; @@ -58,13 +58,6 @@ pub(crate) struct ObjectStore { client: reqwest::Client, } -pin_project_lite::pin_project! { - struct IoError { - #[pin] - inner: S, - } -} - pin_project_lite::pin_project! { struct Timeout { sleep: Option>>, @@ -146,10 +139,11 @@ impl Store for ObjectStore { let allotted = allotted.saturating_sub(now.elapsed()); - Ok( - request_span - .in_scope(|| Box::pin(timeout(allotted, io_error(response.bytes_stream())))), - ) + let stream = response + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + + Ok(request_span.in_scope(|| Box::pin(timeout(allotted, stream)))) } #[tracing::instrument(skip(writer))] @@ -272,14 +266,6 @@ async fn init_generator(repo: &Repo) -> Result { } } -fn io_error(stream: S) -> impl Stream> -where - S: Stream>, - E: Into>, -{ - IoError { inner: stream } -} - fn timeout(duration: Duration, stream: S) -> impl Stream> where S: Stream>, @@ -291,22 +277,6 @@ where } } -impl Stream for IoError -where - S: Stream>, - E: Into>, -{ - type Item = std::io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().project(); - - this.inner.poll_next(cx).map(|opt| { - opt.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) - }) - } -} - struct TimeoutWaker { woken: Arc, inner: Waker,