diff --git a/src/error.rs b/src/error.rs index 940f221..52a1c70 100644 --- a/src/error.rs +++ b/src/error.rs @@ -119,6 +119,9 @@ pub(crate) enum UploadError { #[error("Hit limit")] Limit(#[from] crate::stream::LimitError), + + #[error("Response timeout")] + Timeout(#[from] crate::stream::TimeoutError), } impl From for UploadError { diff --git a/src/main.rs b/src/main.rs index 8286c17..3a3a868 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use actix_web::{ use awc::Client; use futures_util::{ stream::{empty, once}, - Stream, TryStreamExt, + Stream, StreamExt, TryStreamExt, }; use once_cell::sync::Lazy; use std::{ @@ -15,7 +15,7 @@ use std::{ future::ready, path::PathBuf, sync::atomic::{AtomicU64, Ordering}, - time::SystemTime, + time::{Duration, SystemTime}, }; use tokio::{io::AsyncReadExt, sync::Semaphore}; use tracing::{debug, info, instrument}; @@ -47,6 +47,8 @@ mod tmp_file; mod upload_manager; mod validate; +use crate::stream::StreamTimeout; + use self::{ concurrent_processor::CancelSafeProcessor, config::{Configuration, ImageFormat, Operation}, @@ -486,6 +488,12 @@ where E: std::error::Error + 'static, actix_web::Error: From, { + let stream = stream.timeout(Duration::from_secs(5)).map(|res| match res { + Ok(Ok(item)) => Ok(item), + Ok(Err(e)) => Err(actix_web::Error::from(e)), + Err(e) => Err(Error::from(e).into()), + }); + builder .insert_header(LastModified(modified.into())) .insert_header(CacheControl(vec![ diff --git a/src/store/object_store.rs b/src/store/object_store.rs index f2bf76b..78f490e 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -2,18 +2,13 @@ use crate::{ error::Error, repo::{Repo, SettingsRepo}, store::Store, - stream::StreamTimeout, }; use actix_web::web::Bytes; -use futures_util::{Stream, StreamExt}; +use futures_util::{Stream, TryStreamExt}; use s3::{ client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region, }; -use std::{ - pin::Pin, - string::FromUtf8Error, - time::{Duration, Instant}, -}; +use std::{pin::Pin, string::FromUtf8Error}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::Instrument; @@ -31,9 +26,6 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), - #[error("Timeout")] - Elapsed, - #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -110,24 +102,19 @@ impl Store for ObjectStore { ) }); - let now = Instant::now(); - let allotted = Duration::from_secs(5); - let response = request_span - .in_scope(|| tokio::time::timeout(allotted, request.response())) + .in_scope(|| request.response()) .instrument(request_span.clone()) .await - .map_err(|_| ObjectError::Elapsed)? .map_err(ObjectError::from)?; - let allotted = allotted.saturating_sub(now.elapsed()); - - let stream = response.bytes_stream().timeout(allotted).map(|res| { - res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - .and_then(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + let stream = request_span.in_scope(|| { + response + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) }); - Ok(request_span.in_scope(|| Box::pin(stream))) + Ok(Box::pin(stream)) } #[tracing::instrument(skip(writer))]