diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index b3182a2..b7c2600 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -32,14 +32,20 @@ impl BytesStream { self.total_len } - pub(crate) fn into_bytes(self) -> Bytes { - let mut buf = BytesMut::with_capacity(self.total_len); + pub(crate) fn into_bytes(mut self) -> Bytes { + match self.inner.len() { + 0 => Bytes::new(), + 1 => self.inner.pop_back().expect("one element"), + _ => { + let mut buf = BytesMut::with_capacity(self.total_len); - for bytes in self.inner { - buf.extend_from_slice(&bytes); + for bytes in self.inner { + buf.extend(bytes); + } + + buf.freeze() + } } - - buf.freeze() } } diff --git a/src/error.rs b/src/error.rs index f0e96b2..eacacb2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -136,6 +136,9 @@ pub(crate) enum UploadError { #[error("Client took too long to send request")] AggregateTimeout, + #[error("Media processing took too long")] + ProcessTimeout, + #[error("Failed external validation")] ExternalValidation, } diff --git a/src/generate.rs b/src/generate.rs index b0d3373..2336512 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -8,7 +8,7 @@ use crate::{ store::Store, }; use actix_web::web::Bytes; -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; use tokio::io::AsyncReadExt; use tracing::Instrument; @@ -39,8 +39,12 @@ pub(crate) async fn generate( hash.clone(), ); - let (details, bytes) = - CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut).await?; + let (details, bytes) = actix_rt::time::timeout( + Duration::from_secs(timeout * 4), + CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut), + ) + .await + .map_err(|_| UploadError::ProcessTimeout)??; Ok((details, bytes)) } diff --git a/src/ingest.rs b/src/ingest.rs index 21a6c3c..097e052 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -63,7 +63,41 @@ where R: FullRepo + 'static, S: Store, { - let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream)) + actix_rt::time::timeout( + Duration::from_secs(timeout * 4 + 60), + perform_ingest( + repo, + store, + client, + stream, + declared_alias, + external_validation, + external_validation_timeout, + should_validate, + timeout, + ), + ) + .await + .map_err(|_| UploadError::ProcessTimeout)? +} + +#[allow(clippy::too_many_arguments)] +async fn perform_ingest( + repo: &R, + store: &S, + client: &ClientWithMiddleware, + stream: impl Stream> + Unpin + 'static, + declared_alias: Option, + external_validation: Option<&Url>, + external_validation_timeout: u64, + should_validate: bool, + timeout: u64, +) -> Result, Error> +where + R: FullRepo + 'static, + S: Store, +{ + let bytes = actix_rt::time::timeout(Duration::from_secs(60), aggregate(stream)) .await .map_err(|_| UploadError::AggregateTimeout)??;