diff --git a/defaults.toml b/defaults.toml index a837c0d..17faa2f 100644 --- a/defaults.toml +++ b/defaults.toml @@ -23,6 +23,7 @@ max_height = 10000 max_area = 40000000 max_file_size = 40 max_frame_count = 900 +process_timeout = 30 enable_silent_video = true enable_full_video = false video_codec = "vp9" diff --git a/pict-rs.toml b/pict-rs.toml index 859a46d..dfd6cd7 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -154,6 +154,13 @@ max_file_size = 40 # default: # 900 max_frame_count = 900 +## Optional: set a timeout (in seconds) for any spawned process +# environment variable: PICTRS__MEDIA__PROCESS_TIMEOUT +# default: 30 +# +# This may need to be increased if processing large video uploads +process_timeout = 30 + ## Optional: enable GIF, MP4, and WEBM uploads (without sound) # environment variable: PICTRS__MEDIA__ENABLE_SILENT_VIDEO # default: true diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 7d50ec0..d13456b 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -53,6 +53,7 @@ impl Args { media_max_area, media_max_file_size, media_max_frame_count, + media_process_timeout, media_gif_max_width, media_gif_max_height, media_gif_max_area, @@ -90,6 +91,7 @@ impl Args { max_area: media_max_area, max_file_size: media_max_file_size, max_frame_count: media_max_frame_count, + process_timeout: media_process_timeout, gif, enable_silent_video: media_enable_silent_video, enable_full_video: media_enable_full_video, @@ -347,6 +349,8 @@ struct Media { #[serde(skip_serializing_if = "Option::is_none")] max_frame_count: Option, #[serde(skip_serializing_if = "Option::is_none")] + process_timeout: Option, + #[serde(skip_serializing_if = "Option::is_none")] gif: Option, #[serde(skip_serializing_if = "Option::is_none")] enable_silent_video: Option, @@ -474,6 +478,9 @@ struct Run { /// The maximum number of frames allowed for uploaded GIF and MP4s. #[arg(long)] media_max_frame_count: Option, + /// How long to allow any media processing operation to last before giving up + #[arg(long)] + media_process_timeout: Option, /// Maximum width allowed for gif uploads. /// /// If an upload exceeds this value, it will be transcoded to a video format or rejected, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 0ba993b..0f718b9 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -67,6 +67,7 @@ struct MediaDefaults { max_area: usize, max_file_size: usize, max_frame_count: usize, + process_timeout: u64, gif: GifDefaults, enable_silent_video: bool, enable_full_video: bool, @@ -180,6 +181,7 @@ impl Default for MediaDefaults { max_area: 40_000_000, max_file_size: 40, max_frame_count: 900, + process_timeout: 30, gif: Default::default(), enable_silent_video: true, enable_full_video: false, diff --git a/src/config/file.rs b/src/config/file.rs index 390d3ca..5c94afe 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -156,6 +156,8 @@ pub(crate) struct Media { pub(crate) max_frame_count: usize, + pub(crate) process_timeout: u64, + pub(crate) gif: Gif, pub(crate) enable_silent_video: bool, diff --git a/src/details.rs b/src/details.rs index fea1251..d91903d 100644 --- a/src/details.rs +++ b/src/details.rs @@ -30,9 +30,13 @@ impl Details { || self.content_type.type_() == "image" && self.content_type.subtype() == "gif" } - pub(crate) async fn from_bytes(input: web::Bytes, hint: ValidInputType) -> Result { + pub(crate) async fn from_bytes( + input: web::Bytes, + hint: ValidInputType, + timeout: u64, + ) -> Result { let details = if hint.is_video() { - crate::ffmpeg::details_bytes(input.clone()).await? + crate::ffmpeg::details_bytes(input.clone(), timeout).await? } else { None }; @@ -40,7 +44,7 @@ impl Details { let details = if let Some(details) = details { details } else { - crate::magick::details_bytes(input, Some(hint)).await? + crate::magick::details_bytes(input, Some(hint), timeout).await? }; Ok(Details::now( @@ -55,9 +59,10 @@ impl Details { store: S, identifier: S::Identifier, expected_format: Option, + timeout: u64, ) -> Result { let details = if expected_format.map(|t| t.is_video()).unwrap_or(true) { - crate::ffmpeg::details_store(&store, &identifier).await? + crate::ffmpeg::details_store(&store, &identifier, timeout).await? } else { None }; @@ -65,7 +70,7 @@ impl Details { let details = if let Some(details) = details { details } else { - crate::magick::details_store(store, identifier, expected_format).await? + crate::magick::details_store(store, identifier, expected_format, timeout).await? }; Ok(Details::now( diff --git a/src/exiftool.rs b/src/exiftool.rs index cf094f2..f51782f 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -19,9 +19,9 @@ impl ExifError { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) async fn needs_reorienting(input: Bytes) -> Result { - let process = - Process::run("exiftool", &["-n", "-Orientation", "-"]).map_err(ExifError::Process)?; +pub(crate) async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { + let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout) + .map_err(ExifError::Process)?; let mut reader = process.bytes_read(input); let mut buf = String::new(); @@ -34,9 +34,12 @@ pub(crate) async fn needs_reorienting(input: Bytes) -> Result { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result { - let process = - Process::run("exiftool", &["-all=", "-", "-out", "-"]).map_err(ExifError::Process)?; +pub(crate) fn clear_metadata_bytes_read( + input: Bytes, + timeout: u64, +) -> Result { + let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout) + .map_err(ExifError::Process)?; Ok(process.bytes_read(input)) } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 85e364b..f857962 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -203,6 +203,7 @@ impl TranscodeOptions { input_path: &str, output_path: &str, alpha: bool, + timeout: u64, ) -> Result { match self.output { TranscodeOutputOptions::Gif => Process::run("ffmpeg", &[ @@ -217,7 +218,7 @@ impl TranscodeOptions { "-f", self.output_ffmpeg_format(), output_path - ]), + ], timeout), TranscodeOutputOptions::Video { video_codec, audio_codec: None, @@ -240,6 +241,7 @@ impl TranscodeOptions { self.output_ffmpeg_format(), output_path, ], + timeout, ), TranscodeOutputOptions::Video { video_codec, @@ -264,6 +266,7 @@ impl TranscodeOptions { self.output_ffmpeg_format(), output_path, ], + timeout, ), } } @@ -429,8 +432,9 @@ const FORMAT_MAPPINGS: &[(&str, VideoFormat)] = &[ pub(crate) async fn input_type_bytes( input: Bytes, + timeout: u64, ) -> Result, FfMpegError> { - if let Some(details) = details_bytes(input).await? { + if let Some(details) = details_bytes(input, timeout).await? { let input_type = details .validate_input() .map_err(FfMpegError::ValidateDetails)?; @@ -444,33 +448,43 @@ pub(crate) async fn input_type_bytes( pub(crate) async fn details_store( store: &S, identifier: &S::Identifier, + timeout: u64, ) -> Result, FfMpegError> { - details_file(move |mut tmp_one| async move { - let stream = store - .to_stream(identifier, None, None) - .await - .map_err(FfMpegError::Store)?; - tmp_one - .write_from_stream(stream) - .await - .map_err(FfMpegError::Write)?; - Ok(tmp_one) - }) + details_file( + move |mut tmp_one| async move { + let stream = store + .to_stream(identifier, None, None) + .await + .map_err(FfMpegError::Store)?; + tmp_one + .write_from_stream(stream) + .await + .map_err(FfMpegError::Write)?; + Ok(tmp_one) + }, + timeout, + ) .await } -pub(crate) async fn details_bytes(input: Bytes) -> Result, FfMpegError> { - details_file(move |mut tmp_one| async move { - tmp_one - .write_from_bytes(input) - .await - .map_err(FfMpegError::Write)?; - Ok(tmp_one) - }) +pub(crate) async fn details_bytes( + input: Bytes, + timeout: u64, +) -> Result, FfMpegError> { + details_file( + move |mut tmp_one| async move { + tmp_one + .write_from_bytes(input) + .await + .map_err(FfMpegError::Write)?; + Ok(tmp_one) + }, + timeout, + ) .await } -async fn alpha_pixel_formats() -> Result, FfMpegError> { +async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegError> { let process = Process::run( "ffprobe", &[ @@ -483,6 +497,7 @@ async fn alpha_pixel_formats() -> Result, FfMpegError> { "-print_format", "json", ], + timeout, ) .map_err(FfMpegError::Process)?; @@ -535,7 +550,7 @@ struct Format { } #[tracing::instrument(skip(f))] -async fn details_file(f: F) -> Result, FfMpegError> +async fn details_file(f: F, timeout: u64) -> Result, FfMpegError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -568,6 +583,7 @@ where "json", input_file_str, ], + timeout, ) .map_err(FfMpegError::Process)?; @@ -640,7 +656,7 @@ fn parse_details_inner( })) } -async fn pixel_format(input_file: &str) -> Result { +async fn pixel_format(input_file: &str, timeout: u64) -> Result { let process = Process::run( "ffprobe", &[ @@ -654,6 +670,7 @@ async fn pixel_format(input_file: &str) -> Result { "compact=p=0:nk=1", input_file, ], + timeout, ) .map_err(FfMpegError::Process)?; @@ -675,6 +692,7 @@ async fn pixel_format(input_file: &str) -> Result { pub(crate) async fn transcode_bytes( input: Bytes, transcode_options: TranscodeOptions, + timeout: u64, ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(transcode_options.input_file_extension())); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; @@ -700,12 +718,12 @@ pub(crate) async fn transcode_bytes( let alpha = if transcode_options.supports_alpha() { static ALPHA_PIXEL_FORMATS: OnceCell> = OnceCell::new(); - let format = pixel_format(input_file_str).await?; + let format = pixel_format(input_file_str, timeout).await?; match ALPHA_PIXEL_FORMATS.get() { Some(alpha_pixel_formats) => alpha_pixel_formats.contains(&format), None => { - let pixel_formats = alpha_pixel_formats().await?; + let pixel_formats = alpha_pixel_formats(timeout).await?; let alpha = pixel_formats.contains(&format); let _ = ALPHA_PIXEL_FORMATS.set(pixel_formats); alpha @@ -716,7 +734,7 @@ pub(crate) async fn transcode_bytes( }; let process = transcode_options - .execute(input_file_str, output_file_str, alpha) + .execute(input_file_str, output_file_str, alpha, timeout) .map_err(FfMpegError::Process)?; process.wait().await.map_err(FfMpegError::Process)?; @@ -743,6 +761,7 @@ pub(crate) async fn thumbnail( from: S::Identifier, input_format: VideoFormat, format: ThumbnailFormat, + timeout: u64, ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension())); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; @@ -785,6 +804,7 @@ pub(crate) async fn thumbnail( format.as_ffmpeg_format(), output_file_str, ], + timeout, ) .map_err(FfMpegError::Process)?; diff --git a/src/generate.rs b/src/generate.rs index 5e88f67..132a7d3 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -23,6 +23,7 @@ pub(crate) async fn generate( thumbnail_args: Vec, input_format: Option, thumbnail_format: Option, + timeout: u64, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { let process_fut = process( @@ -34,6 +35,7 @@ pub(crate) async fn generate( thumbnail_args, input_format, thumbnail_format, + timeout, hash.clone(), ); @@ -54,6 +56,7 @@ async fn process( thumbnail_args: Vec, input_format: Option, thumbnail_format: Option, + timeout: u64, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { let permit = crate::PROCESS_SEMAPHORE.acquire().await; @@ -75,6 +78,7 @@ async fn process( identifier, input_format.unwrap_or(VideoFormat::Mp4), thumbnail_format, + timeout, ) .await?; let motion_identifier = store @@ -87,8 +91,13 @@ async fn process( motion_identifier }; - let mut processed_reader = - crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; + let mut processed_reader = crate::magick::process_image_store_read( + store.clone(), + identifier, + thumbnail_args, + format, + timeout, + )?; let mut vec = Vec::new(); processed_reader @@ -99,7 +108,7 @@ async fn process( drop(permit); - let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; + let details = Details::from_bytes(bytes.clone(), format.as_hint(), timeout).await?; let identifier = store .save_bytes(bytes.clone(), details.content_type()) diff --git a/src/ingest.rs b/src/ingest.rs index 564bda9..56f4545 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -48,6 +48,7 @@ pub(crate) async fn ingest( stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, + timeout: u64, ) -> Result, Error> where R: FullRepo + 'static, @@ -65,8 +66,12 @@ where if let Some(format) = input_type.to_format() { let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?; - let processed_reader = - crate::magick::process_image_async_read(validated_reader, magick_args, format)?; + let processed_reader = crate::magick::process_image_async_read( + validated_reader, + magick_args, + format, + timeout, + )?; Either::left(processed_reader) } else { diff --git a/src/lib.rs b/src/lib.rs index 633153e..8f4ee95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,7 +122,13 @@ async fn ensure_details( } else { tracing::debug!("generating new details from {:?}", identifier); let hint = details_hint(alias); - let new_details = Details::from_store(store.clone(), identifier.clone(), hint).await?; + let new_details = Details::from_store( + store.clone(), + identifier.clone(), + hint, + CONFIG.media.process_timeout, + ) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -164,8 +170,18 @@ impl FormData for Upload { let stream = stream.map_err(Error::from); Box::pin( - async move { ingest::ingest(&**repo, &**store, stream, None, true).await } - .instrument(span), + async move { + ingest::ingest( + &**repo, + &**store, + stream, + None, + true, + CONFIG.media.process_timeout, + ) + .await + } + .instrument(span), ) })), ) @@ -217,6 +233,7 @@ impl FormData for Import { stream, Some(Alias::from_existing(&filename)), !CONFIG.media.skip_validate_imports, + CONFIG.media.process_timeout, ) .await } @@ -472,7 +489,15 @@ async fn do_download_inline( repo: web::Data, store: web::Data, ) -> Result { - let mut session = ingest::ingest(&repo, &store, stream, None, true).await?; + let mut session = ingest::ingest( + &repo, + &store, + stream, + None, + true, + CONFIG.media.process_timeout, + ) + .await?; let alias = session.alias().expect("alias should exist").to_owned(); let delete_token = session.delete_token().await?; @@ -658,6 +683,7 @@ async fn process( (**store).clone(), identifier.clone(), Some(ValidInputType::from_format(format)), + CONFIG.media.process_timeout, ) .await?; tracing::debug!("storing details for {:?}", identifier); @@ -680,6 +706,7 @@ async fn process( thumbnail_args, original_details.to_input_format(), None, + CONFIG.media.process_timeout, hash, ) .await?; @@ -753,6 +780,7 @@ async fn process_head( (**store).clone(), identifier.clone(), Some(ValidInputType::from_format(format)), + CONFIG.media.process_timeout, ) .await?; tracing::debug!("storing details for {:?}", identifier); diff --git a/src/magick.rs b/src/magick.rs index 650c729..6ef6c26 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -206,6 +206,7 @@ pub(crate) struct Details { pub(crate) fn convert_bytes_read( input: Bytes, format: ImageFormat, + timeout: u64, ) -> Result { let process = Process::run( "magick", @@ -216,6 +217,7 @@ pub(crate) fn convert_bytes_read( "-strip", format!("{}:-", format.as_magick_format()).as_str(), ], + timeout, ) .map_err(MagickError::Process)?; @@ -226,6 +228,7 @@ pub(crate) fn convert_bytes_read( pub(crate) async fn details_bytes( input: Bytes, hint: Option, + timeout: u64, ) -> Result { if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { let input_file = crate::tmp_file::tmp_file(Some(hint)); @@ -243,7 +246,7 @@ pub(crate) async fn details_bytes( .map_err(MagickError::Write)?; tmp_one.close().await.map_err(MagickError::CloseFile)?; - return details_file(input_file_str).await; + return details_file(input_file_str, timeout).await; } let last_arg = if let Some(expected_format) = hint { @@ -252,7 +255,7 @@ pub(crate) async fn details_bytes( "-".to_owned() }; - let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"]) + let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"], timeout) .map_err(MagickError::Process)?; let mut reader = process.bytes_read(input); @@ -295,6 +298,7 @@ pub(crate) async fn details_store( store: S, identifier: S::Identifier, hint: Option, + timeout: u64, ) -> Result { if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { let input_file = crate::tmp_file::tmp_file(Some(hint)); @@ -316,7 +320,7 @@ pub(crate) async fn details_store( .map_err(MagickError::Write)?; tmp_one.close().await.map_err(MagickError::CloseFile)?; - return details_file(input_file_str).await; + return details_file(input_file_str, timeout).await; } let last_arg = if let Some(expected_format) = hint { @@ -325,7 +329,7 @@ pub(crate) async fn details_store( "-".to_owned() }; - let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"]) + let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"], timeout) .map_err(MagickError::Process)?; let mut reader = process.store_read(store, identifier); @@ -347,8 +351,8 @@ pub(crate) async fn details_store( } #[tracing::instrument] -pub(crate) async fn details_file(path_str: &str) -> Result { - let process = Process::run("magick", &["convert", "-ping", path_str, "JSON:"]) +pub(crate) async fn details_file(path_str: &str, timeout: u64) -> Result { + let process = Process::run("magick", &["convert", "-ping", path_str, "JSON:"], timeout) .map_err(MagickError::Process)?; let mut reader = process.read(); @@ -438,15 +442,20 @@ fn parse_details(details_output: Vec) -> Result Result<(Details, ValidInputType), MagickError> { - let details = details_bytes(input, None).await?; + let details = details_bytes(input, None, timeout).await?; let input_type = details .validate_input() .map_err(MagickError::ValidateDetails)?; Ok((details, input_type)) } -fn process_image(process_args: Vec, format: ImageFormat) -> Result { +fn process_image( + process_args: Vec, + format: ImageFormat, + timeout: u64, +) -> Result { let command = "magick"; let convert_args = ["convert", "-"]; let last_arg = format!("{}:-", format.as_magick_format()); @@ -456,7 +465,7 @@ fn process_image(process_args: Vec, format: ImageFormat) -> Result( @@ -464,8 +473,9 @@ pub(crate) fn process_image_store_read( identifier: S::Identifier, args: Vec, format: ImageFormat, + timeout: u64, ) -> Result { - Ok(process_image(args, format) + Ok(process_image(args, format, timeout) .map_err(MagickError::Process)? .store_read(store, identifier)) } @@ -474,8 +484,9 @@ pub(crate) fn process_image_async_read( async_read: A, args: Vec, format: ImageFormat, + timeout: u64, ) -> Result { - Ok(process_image(args, format) + Ok(process_image(args, format, timeout) .map_err(MagickError::Process)? .pipe_async_read(async_read)) } diff --git a/src/process.rs b/src/process.rs index f58ba6a..9c89f3f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -6,10 +6,11 @@ use std::{ pin::Pin, process::{ExitStatus, Stdio}, task::{Context, Poll}, + time::Duration, }; use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, - process::{Child, ChildStdin, Command}, + process::{Child, ChildStdin, ChildStdout, Command}, sync::oneshot::{channel, Receiver}, }; use tracing::{Instrument, Span}; @@ -19,6 +20,7 @@ struct StatusError(ExitStatus); pub(crate) struct Process { child: Child, + timeout: Duration, } impl std::fmt::Debug for Process { @@ -31,15 +33,14 @@ struct DropHandle { inner: JoinHandle<()>, } -pin_project_lite::pin_project! { - struct ProcessRead { - #[pin] - inner: I, - err_recv: Receiver, - err_closed: bool, - handle: DropHandle, - eof: bool, - } +pub(crate) struct ProcessRead { + inner: I, + err_recv: Receiver, + err_closed: bool, + #[allow(dead_code)] + handle: DropHandle, + eof: bool, + sleep: Pin>, } #[derive(Debug, thiserror::Error)] @@ -53,6 +54,9 @@ pub(crate) enum ProcessError { #[error("Reached process spawn limit")] LimitReached, + #[error("Process timed out")] + Timeout, + #[error("Failed with status {0}")] Status(ExitStatus), @@ -61,9 +65,9 @@ pub(crate) enum ProcessError { } impl Process { - pub(crate) fn run(command: &str, args: &[&str]) -> Result { + pub(crate) fn run(command: &str, args: &[&str], timeout: u64) -> Result { let res = tracing::trace_span!(parent: None, "Create command") - .in_scope(|| Self::spawn(Command::new(command).args(args))); + .in_scope(|| Self::spawn(Command::new(command).args(args), timeout)); match res { Ok(this) => Ok(this), @@ -78,43 +82,49 @@ impl Process { } } - fn spawn(cmd: &mut Command) -> std::io::Result { + fn spawn(cmd: &mut Command, timeout: u64) -> std::io::Result { + let timeout = Duration::from_secs(timeout); + tracing::trace_span!(parent: None, "Spawn command").in_scope(|| { let cmd = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .kill_on_drop(true); - cmd.spawn().map(|child| Process { child }) + cmd.spawn().map(|child| Process { child, timeout }) }) } #[tracing::instrument(skip(self))] - pub(crate) async fn wait(mut self) -> Result<(), ProcessError> { - let res = self.child.wait().await; + pub(crate) async fn wait(self) -> Result<(), ProcessError> { + let Process { mut child, timeout } = self; - match res { - Ok(status) if status.success() => Ok(()), - Ok(status) => Err(ProcessError::Status(status)), - Err(e) => Err(ProcessError::Other(e)), + match actix_rt::time::timeout(timeout, child.wait()).await { + Ok(Ok(status)) if status.success() => Ok(()), + Ok(Ok(status)) => Err(ProcessError::Status(status)), + Ok(Err(e)) => Err(ProcessError::Other(e)), + Err(_) => { + child.kill().await.map_err(ProcessError::Other)?; + return Err(ProcessError::Timeout); + } } } - pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin { + pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { self.spawn_fn(move |mut stdin| { let mut input = input; async move { stdin.write_all_buf(&mut input).await } }) } - pub(crate) fn read(self) -> impl AsyncRead + Unpin { + pub(crate) fn read(self) -> ProcessRead { self.spawn_fn(|_| async { Ok(()) }) } pub(crate) fn pipe_async_read( self, mut async_read: A, - ) -> impl AsyncRead + Unpin { + ) -> ProcessRead { self.spawn_fn(move |mut stdin| async move { tokio::io::copy(&mut async_read, &mut stdin) .await @@ -126,7 +136,7 @@ impl Process { self, store: S, identifier: S::Identifier, - ) -> impl AsyncRead + Unpin { + ) -> ProcessRead { self.spawn_fn(move |mut stdin| { let store = store; let identifier = identifier; @@ -138,13 +148,15 @@ impl Process { #[allow(unknown_lints)] #[allow(clippy::let_with_type_underscore)] #[tracing::instrument(level = "trace", skip_all)] - fn spawn_fn(mut self, f: F) -> impl AsyncRead + Unpin + fn spawn_fn(self, f: F) -> ProcessRead where F: FnOnce(ChildStdin) -> Fut + 'static, Fut: Future>, { - let stdin = self.child.stdin.take().expect("stdin exists"); - let stdout = self.child.stdout.take().expect("stdout exists"); + let Process { mut child, timeout } = self; + + let stdin = child.stdin.take().expect("stdin exists"); + let stdout = child.stdout.take().expect("stdout exists"); let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") .in_scope(channel::); @@ -152,82 +164,84 @@ impl Process { let span = tracing::info_span!(parent: None, "Background process task"); span.follows_from(Span::current()); - let mut child = self.child; let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - if let Err(e) = (f)(stdin).await { - let _ = tx.send(e); - return; - } + let child_fut = async { + (f)(stdin).await?; - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx.send(std::io::Error::new( - std::io::ErrorKind::Other, - StatusError(status), - )); - } - } - Err(e) => { - let _ = tx.send(e); - } - } + child.wait().await + }; + + let err = match actix_rt::time::timeout(timeout, child_fut).await { + Ok(Ok(status)) if status.success() => return, + Ok(Ok(status)) => std::io::Error::new( + std::io::ErrorKind::Other, + ProcessError::Status(status), + ), + Ok(Err(e)) => e, + Err(_) => std::io::ErrorKind::TimedOut.into(), + }; + + let _ = tx.send(err); + + let _ = child.kill().await; } .instrument(span), ) }); + let sleep = Box::pin(actix_rt::time::sleep(timeout)); + ProcessRead { inner: stdout, err_recv: rx, err_closed: false, handle: DropHandle { inner: handle }, eof: false, + sleep, } } } impl AsyncRead for ProcessRead where - I: AsyncRead, + I: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let this = self.as_mut().project(); - - let err_recv = this.err_recv; - let err_closed = this.err_closed; - let eof = this.eof; - let inner = this.inner; - - if !*err_closed { - if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) { - *err_closed = true; + if !self.err_closed { + if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { + self.err_closed = true; if let Ok(err) = res { return Poll::Ready(Err(err)); } - if *eof { + if self.eof { return Poll::Ready(Ok(())); } } + + if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) { + self.err_closed = true; + + return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())); + } } - if !*eof { + if !self.eof { let before_size = buf.filled().len(); - return match inner.poll_read(cx, buf) { + return match Pin::new(&mut self.inner).poll_read(cx, buf) { Poll::Ready(Ok(())) => { if buf.filled().len() == before_size { - *eof = true; + self.eof = true; - if !*err_closed { + if !self.err_closed { // reached end of stream & haven't received process signal return Poll::Pending; } @@ -236,7 +250,7 @@ where Poll::Ready(Ok(())) } Poll::Ready(Err(e)) => { - *eof = true; + self.eof = true; Poll::Ready(Err(e)) } @@ -244,7 +258,7 @@ where }; } - if *err_closed && *eof { + if self.err_closed && self.eof { return Poll::Ready(Ok(())); } diff --git a/src/queue/process.rs b/src/queue/process.rs index fa76255..ec9443c 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -35,6 +35,7 @@ where Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), should_validate, + crate::CONFIG.media.process_timeout, ) .await? } @@ -51,6 +52,7 @@ where Serde::into_inner(source), process_path, process_args, + crate::CONFIG.media.process_timeout, ) .await? } @@ -72,6 +74,7 @@ async fn process_ingest( upload_id: UploadId, declared_alias: Option, should_validate: bool, + timeout: u64, ) -> Result<(), Error> where R: FullRepo + 'static, @@ -90,9 +93,15 @@ where .await? .map_err(Error::from); - let session = - crate::ingest::ingest(&repo, &store2, stream, declared_alias, should_validate) - .await?; + let session = crate::ingest::ingest( + &repo, + &store2, + stream, + declared_alias, + should_validate, + timeout, + ) + .await?; let token = session.delete_token().await?; @@ -134,6 +143,7 @@ async fn generate( source: Alias, process_path: PathBuf, process_args: Vec, + timeout: u64, ) -> Result<(), Error> { let Some(hash) = repo.hash(&source).await? else { // Nothing to do @@ -160,6 +170,7 @@ async fn generate( process_args, original_details.to_input_format(), None, + timeout, hash, ) .await?; diff --git a/src/validate.rs b/src/validate.rs index 922ba5d..23056cd 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -41,12 +41,13 @@ pub(crate) async fn validate_bytes( media: &MediaConfiguration, validate: bool, ) -> Result<(ValidInputType, impl AsyncRead + Unpin), Error> { - let (details, input_type) = - if let Some(tup) = crate::ffmpeg::input_type_bytes(bytes.clone()).await? { - tup - } else { - crate::magick::input_type_bytes(bytes.clone()).await? - }; + let (details, input_type) = if let Some(tup) = + crate::ffmpeg::input_type_bytes(bytes.clone(), media.process_timeout).await? + { + tup + } else { + crate::magick::input_type_bytes(bytes.clone(), media.process_timeout).await? + }; if !validate { return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes)))); @@ -63,7 +64,12 @@ pub(crate) async fn validate_bytes( Ok(( transcode_options.output_type(), Either::right(Either::left(Either::left( - crate::ffmpeg::transcode_bytes(bytes, transcode_options).await?, + crate::ffmpeg::transcode_bytes( + bytes, + transcode_options, + media.process_timeout, + ) + .await?, ))), )) } else { @@ -71,6 +77,7 @@ pub(crate) async fn validate_bytes( transcode_options.output_type(), Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read( bytes, + media.process_timeout, )?)), )) } @@ -78,21 +85,25 @@ pub(crate) async fn validate_bytes( (FileFormat::Image(image_format), Some(format)) if image_format != format => Ok(( ValidInputType::from_format(format), Either::right(Either::left(Either::right( - crate::magick::convert_bytes_read(bytes, format)?, + crate::magick::convert_bytes_read(bytes, format, media.process_timeout)?, ))), )), (FileFormat::Image(ImageFormat::Webp), _) => Ok(( ValidInputType::Webp, Either::right(Either::left(Either::right( - crate::magick::convert_bytes_read(bytes, ImageFormat::Webp)?, + crate::magick::convert_bytes_read(bytes, ImageFormat::Webp, media.process_timeout)?, ))), )), (FileFormat::Image(image_format), _) => { - if crate::exiftool::needs_reorienting(bytes.clone()).await? { + if crate::exiftool::needs_reorienting(bytes.clone(), media.process_timeout).await? { Ok(( ValidInputType::from_format(image_format), Either::right(Either::left(Either::right( - crate::magick::convert_bytes_read(bytes, image_format)?, + crate::magick::convert_bytes_read( + bytes, + image_format, + media.process_timeout, + )?, ))), )) } else { @@ -100,6 +111,7 @@ pub(crate) async fn validate_bytes( ValidInputType::from_format(image_format), Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read( bytes, + media.process_timeout, )?)), )) }