From ac9a3773d170d7b7c7b36b395c12c0fe98c3ea39 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 9 Dec 2024 19:16:29 -0600 Subject: [PATCH] Timeout all operations while holding semaphore This ensures that the semaphore is always guaranteed to be released, even if a processing operation stalls --- src/generate.rs | 79 ++++++++++++++++++++++++++++++++----------------- src/ingest.rs | 30 +++++++++++++++++-- 2 files changed, 79 insertions(+), 30 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index c270186..5b2f46a 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -2,6 +2,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, details::Details, error::{Error, UploadError}, formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, @@ -196,33 +197,16 @@ async fn process( let guard = MetricsGuard::guard(); let permit = crate::process_semaphore().acquire().await?; - let identifier = input_identifier(state, output_format, hash.clone(), original_details).await?; - - let input_details = crate::ensure_details_identifier(state, &identifier).await?; - - let input_format = input_details - .internal_format() - .processable_format() - .expect("Already verified format is processable"); - - let format = input_format.process_to(output_format); - - let quality = match format { - ProcessableFormat::Image(format) => state.config.media.image.quality_for(format), - ProcessableFormat::Animation(format) => state.config.media.animation.quality_for(format), - }; - - let stream = state.store.to_stream(&identifier, None, None).await?; - - let bytes = - crate::magick::process_image_command(state, variant_args, input_format, format, quality) - .await? - .drive_with_stream(stream) - .into_bytes_stream() - .instrument(tracing::info_span!( - "Reading processed image to BytesStream" - )) - .await?; + let bytes = do_process( + state, + output_format, + variant_args, + hash.clone(), + original_details, + ) + .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) + .await + .map_err(|_| UploadError::ProcessTimeout)??; drop(permit); @@ -258,6 +242,47 @@ async fn process( Ok((details, identifier)) as Result<(Details, Arc), Error> } +async fn do_process( + state: &State, + output_format: InputProcessableFormat, + variant_args: Vec, + hash: Hash, + original_details: &Details, +) -> Result +where + S: Store + 'static, +{ + let identifier = input_identifier(state, output_format, hash.clone(), original_details).await?; + + let input_details = crate::ensure_details_identifier(state, &identifier).await?; + + let input_format = input_details + .internal_format() + .processable_format() + .expect("Already verified format is processable"); + + let format = input_format.process_to(output_format); + + let quality = match format { + ProcessableFormat::Image(format) => state.config.media.image.quality_for(format), + ProcessableFormat::Animation(format) => state.config.media.animation.quality_for(format), + }; + + let stream = state.store.to_stream(&identifier, None, None).await?; + + let bytes = + crate::magick::process_image_command(state, variant_args, input_format, format, quality) + .await? + .drive_with_stream(stream) + .into_bytes_stream() + .instrument(tracing::info_span!( + "Reading processed image to BytesStream" + )) + .await?; + + Ok(bytes) +} + pub(crate) async fn ensure_motion_identifier( state: &State, hash: Hash, diff --git a/src/ingest.rs b/src/ingest.rs index d8fb592..dcf79f5 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -8,7 +8,7 @@ use crate::{ details::Details, error::{Error, UploadError}, formats::InternalFormat, - future::{WithMetrics, WithPollTimer}, + future::{WithMetrics, WithPollTimer, WithTimeout}, repo::{Alias, ArcRepo, DeleteToken, Hash}, state::State, store::Store, @@ -58,6 +58,32 @@ where let permit = crate::process_semaphore().acquire().await?; + let res = process_ingest_bytestream(state, bytes, upload_query) + .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) + .await + .map_err(|_| UploadError::ProcessTimeout)?; + + drop(permit); + + res +} + +async fn process_ingest_bytestream( + state: &State, + bytes: BytesStream, + upload_query: &UploadQuery, +) -> Result< + ( + InternalFormat, + Arc, + Details, + Rc>, + ), + Error, +> +where + S: Store, +{ tracing::trace!("Validating bytes"); let (input_type, process_read) = validate::validate_bytes_stream(state, bytes, &upload_query.limits) @@ -120,8 +146,6 @@ where .with_poll_timer("details-from-bytes-stream") .await?; - drop(permit); - Ok((input_type, identifier, details, hash_state)) }