Remove custom stream error mapping types

This commit is contained in:
Aode (lion) 2022-03-29 14:17:20 -05:00
parent 8226a3571d
commit 25f4480809
3 changed files with 19 additions and 88 deletions

View File

@ -7,7 +7,7 @@ use actix_web::{
use awc::Client; use awc::Client;
use futures_util::{ use futures_util::{
stream::{empty, once}, stream::{empty, once},
Stream, Stream, TryStreamExt,
}; };
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{ use std::{
@ -35,7 +35,6 @@ mod ffmpeg;
mod file; mod file;
mod init_tracing; mod init_tracing;
mod magick; mod magick;
mod map_error;
mod middleware; mod middleware;
mod migrate; mod migrate;
mod process; mod process;
@ -207,7 +206,7 @@ async fn download<S: Store>(
} }
let stream = Limit::new( let stream = Limit::new(
map_error::map_crate_error(res), res.map_err(Error::from),
(CONFIG.media.max_file_size * MEGABYTES) as u64, (CONFIG.media.max_file_size * MEGABYTES) as u64,
); );
@ -495,9 +494,11 @@ async fn ranged_file_resp<S: Store>(
builder.insert_header(content_range); builder.insert_header(content_range);
( (
builder, builder,
Either::left(Either::left(map_error::map_crate_error( Either::left(Either::left(
range::chop_store(range, store, &identifier, len).await?, range::chop_store(range, store, &identifier, len)
))), .await?
.map_err(Error::from),
)),
) )
} else { } else {
( (
@ -510,7 +511,10 @@ async fn ranged_file_resp<S: Store>(
} }
} else { } else {
//No Range header in the request - return the entire document //No Range header in the request - return the entire document
let stream = map_error::map_crate_error(store.to_stream(&identifier, None, None).await?); let stream = store
.to_stream(&identifier, None, None)
.await?
.map_err(Error::from);
(HttpResponse::Ok(), Either::right(stream)) (HttpResponse::Ok(), Either::right(stream))
}; };
@ -642,7 +646,7 @@ async fn launch<S: Store + Clone + 'static>(
.session(store) .session(store)
.upload( .upload(
CONFIG.media.enable_silent_video, CONFIG.media.enable_silent_video,
map_error::map_crate_error(stream), stream.map_err(Error::from),
) )
.await; .await;
@ -679,7 +683,7 @@ async fn launch<S: Store + Clone + 'static>(
filename, filename,
!CONFIG.media.skip_validate_imports, !CONFIG.media.skip_validate_imports,
CONFIG.media.enable_silent_video, CONFIG.media.enable_silent_video,
map_error::map_crate_error(stream), stream.map_err(Error::from),
) )
.await; .await;

View File

@ -1,43 +0,0 @@
use crate::error::Error;
use futures_util::stream::Stream;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
pin_project_lite::pin_project! {
pub(super) struct MapError<E, S> {
#[pin]
inner: S,
_error: PhantomData<E>,
}
}
pub(super) fn map_crate_error<S>(inner: S) -> MapError<Error, S> {
map_error(inner)
}
pub(super) fn map_error<S, E>(inner: S) -> MapError<E, S> {
MapError {
inner,
_error: PhantomData,
}
}
impl<T, StreamErr, E, S> Stream for MapError<E, S>
where
S: Stream<Item = Result<T, StreamErr>>,
E: From<StreamErr>,
{
type Item = Result<T, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
this.inner
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
}

View File

@ -5,7 +5,7 @@ use crate::{
}; };
use actix_rt::time::Sleep; use actix_rt::time::Sleep;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_util::stream::Stream; use futures_util::{stream::Stream, TryStreamExt};
use s3::{ use s3::{
client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region, client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region,
}; };
@ -58,13 +58,6 @@ pub(crate) struct ObjectStore {
client: reqwest::Client, client: reqwest::Client,
} }
pin_project_lite::pin_project! {
struct IoError<S> {
#[pin]
inner: S,
}
}
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
struct Timeout<S> { struct Timeout<S> {
sleep: Option<Pin<Box<Sleep>>>, sleep: Option<Pin<Box<Sleep>>>,
@ -146,10 +139,11 @@ impl Store for ObjectStore {
let allotted = allotted.saturating_sub(now.elapsed()); let allotted = allotted.saturating_sub(now.elapsed());
Ok( let stream = response
request_span .bytes_stream()
.in_scope(|| Box::pin(timeout(allotted, io_error(response.bytes_stream())))), .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
)
Ok(request_span.in_scope(|| Box::pin(timeout(allotted, stream))))
} }
#[tracing::instrument(skip(writer))] #[tracing::instrument(skip(writer))]
@ -272,14 +266,6 @@ async fn init_generator(repo: &Repo) -> Result<Generator, Error> {
} }
} }
fn io_error<S, T, E>(stream: S) -> impl Stream<Item = std::io::Result<T>>
where
S: Stream<Item = Result<T, E>>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
IoError { inner: stream }
}
fn timeout<S, T>(duration: Duration, stream: S) -> impl Stream<Item = std::io::Result<T>> fn timeout<S, T>(duration: Duration, stream: S) -> impl Stream<Item = std::io::Result<T>>
where where
S: Stream<Item = std::io::Result<T>>, S: Stream<Item = std::io::Result<T>>,
@ -291,22 +277,6 @@ where
} }
} }
impl<S, T, E> Stream for IoError<S>
where
S: Stream<Item = Result<T, E>>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Item = std::io::Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
this.inner.poll_next(cx).map(|opt| {
opt.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
})
}
}
struct TimeoutWaker { struct TimeoutWaker {
woken: Arc<AtomicBool>, woken: Arc<AtomicBool>,
inner: Waker, inner: Waker,