2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-11-20 11:21:14 +00:00

Add process timeout

This commit is contained in:
asonix 2023-08-05 11:51:52 -05:00
parent 14ef8d0334
commit 35c280a08d
15 changed files with 272 additions and 135 deletions

View file

@ -23,6 +23,7 @@ max_height = 10000
max_area = 40000000 max_area = 40000000
max_file_size = 40 max_file_size = 40
max_frame_count = 900 max_frame_count = 900
process_timeout = 30
enable_silent_video = true enable_silent_video = true
enable_full_video = false enable_full_video = false
video_codec = "vp9" video_codec = "vp9"

View file

@ -154,6 +154,13 @@ max_file_size = 40
# default: # 900 # default: # 900
max_frame_count = 900 max_frame_count = 900
## Optional: set a timeout (in seconds) for any spawned process
# environment variable: PICTRS__MEDIA__PROCESS_TIMEOUT
# default: 30
#
# This may need to be increased if processing large video uploads
process_timeout = 30
## Optional: enable GIF, MP4, and WEBM uploads (without sound) ## Optional: enable GIF, MP4, and WEBM uploads (without sound)
# environment variable: PICTRS__MEDIA__ENABLE_SILENT_VIDEO # environment variable: PICTRS__MEDIA__ENABLE_SILENT_VIDEO
# default: true # default: true

View file

@ -53,6 +53,7 @@ impl Args {
media_max_area, media_max_area,
media_max_file_size, media_max_file_size,
media_max_frame_count, media_max_frame_count,
media_process_timeout,
media_gif_max_width, media_gif_max_width,
media_gif_max_height, media_gif_max_height,
media_gif_max_area, media_gif_max_area,
@ -90,6 +91,7 @@ impl Args {
max_area: media_max_area, max_area: media_max_area,
max_file_size: media_max_file_size, max_file_size: media_max_file_size,
max_frame_count: media_max_frame_count, max_frame_count: media_max_frame_count,
process_timeout: media_process_timeout,
gif, gif,
enable_silent_video: media_enable_silent_video, enable_silent_video: media_enable_silent_video,
enable_full_video: media_enable_full_video, enable_full_video: media_enable_full_video,
@ -347,6 +349,8 @@ struct Media {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
max_frame_count: Option<usize>, max_frame_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
process_timeout: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
gif: Option<Gif>, gif: Option<Gif>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
enable_silent_video: Option<bool>, enable_silent_video: Option<bool>,
@ -474,6 +478,9 @@ struct Run {
/// The maximum number of frames allowed for uploaded GIF and MP4s. /// The maximum number of frames allowed for uploaded GIF and MP4s.
#[arg(long)] #[arg(long)]
media_max_frame_count: Option<usize>, media_max_frame_count: Option<usize>,
/// How long to allow any media processing operation to last before giving up
#[arg(long)]
media_process_timeout: Option<u64>,
/// Maximum width allowed for gif uploads. /// Maximum width allowed for gif uploads.
/// ///
/// If an upload exceeds this value, it will be transcoded to a video format or rejected, /// If an upload exceeds this value, it will be transcoded to a video format or rejected,

View file

@ -67,6 +67,7 @@ struct MediaDefaults {
max_area: usize, max_area: usize,
max_file_size: usize, max_file_size: usize,
max_frame_count: usize, max_frame_count: usize,
process_timeout: u64,
gif: GifDefaults, gif: GifDefaults,
enable_silent_video: bool, enable_silent_video: bool,
enable_full_video: bool, enable_full_video: bool,
@ -180,6 +181,7 @@ impl Default for MediaDefaults {
max_area: 40_000_000, max_area: 40_000_000,
max_file_size: 40, max_file_size: 40,
max_frame_count: 900, max_frame_count: 900,
process_timeout: 30,
gif: Default::default(), gif: Default::default(),
enable_silent_video: true, enable_silent_video: true,
enable_full_video: false, enable_full_video: false,

View file

@ -156,6 +156,8 @@ pub(crate) struct Media {
pub(crate) max_frame_count: usize, pub(crate) max_frame_count: usize,
pub(crate) process_timeout: u64,
pub(crate) gif: Gif, pub(crate) gif: Gif,
pub(crate) enable_silent_video: bool, pub(crate) enable_silent_video: bool,

View file

@ -30,9 +30,13 @@ impl Details {
|| self.content_type.type_() == "image" && self.content_type.subtype() == "gif" || self.content_type.type_() == "image" && self.content_type.subtype() == "gif"
} }
pub(crate) async fn from_bytes(input: web::Bytes, hint: ValidInputType) -> Result<Self, Error> { pub(crate) async fn from_bytes(
input: web::Bytes,
hint: ValidInputType,
timeout: u64,
) -> Result<Self, Error> {
let details = if hint.is_video() { let details = if hint.is_video() {
crate::ffmpeg::details_bytes(input.clone()).await? crate::ffmpeg::details_bytes(input.clone(), timeout).await?
} else { } else {
None None
}; };
@ -40,7 +44,7 @@ impl Details {
let details = if let Some(details) = details { let details = if let Some(details) = details {
details details
} else { } else {
crate::magick::details_bytes(input, Some(hint)).await? crate::magick::details_bytes(input, Some(hint), timeout).await?
}; };
Ok(Details::now( Ok(Details::now(
@ -55,9 +59,10 @@ impl Details {
store: S, store: S,
identifier: S::Identifier, identifier: S::Identifier,
expected_format: Option<ValidInputType>, expected_format: Option<ValidInputType>,
timeout: u64,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let details = if expected_format.map(|t| t.is_video()).unwrap_or(true) { let details = if expected_format.map(|t| t.is_video()).unwrap_or(true) {
crate::ffmpeg::details_store(&store, &identifier).await? crate::ffmpeg::details_store(&store, &identifier, timeout).await?
} else { } else {
None None
}; };
@ -65,7 +70,7 @@ impl Details {
let details = if let Some(details) = details { let details = if let Some(details) = details {
details details
} else { } else {
crate::magick::details_store(store, identifier, expected_format).await? crate::magick::details_store(store, identifier, expected_format, timeout).await?
}; };
Ok(Details::now( Ok(Details::now(

View file

@ -19,9 +19,9 @@ impl ExifError {
} }
#[tracing::instrument(level = "trace", skip(input))] #[tracing::instrument(level = "trace", skip(input))]
pub(crate) async fn needs_reorienting(input: Bytes) -> Result<bool, ExifError> { pub(crate) async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
let process = let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout)
Process::run("exiftool", &["-n", "-Orientation", "-"]).map_err(ExifError::Process)?; .map_err(ExifError::Process)?;
let mut reader = process.bytes_read(input); let mut reader = process.bytes_read(input);
let mut buf = String::new(); let mut buf = String::new();
@ -34,9 +34,12 @@ pub(crate) async fn needs_reorienting(input: Bytes) -> Result<bool, ExifError> {
} }
#[tracing::instrument(level = "trace", skip(input))] #[tracing::instrument(level = "trace", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result<impl AsyncRead + Unpin, ExifError> { pub(crate) fn clear_metadata_bytes_read(
let process = input: Bytes,
Process::run("exiftool", &["-all=", "-", "-out", "-"]).map_err(ExifError::Process)?; timeout: u64,
) -> Result<impl AsyncRead + Unpin, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)
.map_err(ExifError::Process)?;
Ok(process.bytes_read(input)) Ok(process.bytes_read(input))
} }

View file

@ -203,6 +203,7 @@ impl TranscodeOptions {
input_path: &str, input_path: &str,
output_path: &str, output_path: &str,
alpha: bool, alpha: bool,
timeout: u64,
) -> Result<Process, ProcessError> { ) -> Result<Process, ProcessError> {
match self.output { match self.output {
TranscodeOutputOptions::Gif => Process::run("ffmpeg", &[ TranscodeOutputOptions::Gif => Process::run("ffmpeg", &[
@ -217,7 +218,7 @@ impl TranscodeOptions {
"-f", "-f",
self.output_ffmpeg_format(), self.output_ffmpeg_format(),
output_path output_path
]), ], timeout),
TranscodeOutputOptions::Video { TranscodeOutputOptions::Video {
video_codec, video_codec,
audio_codec: None, audio_codec: None,
@ -240,6 +241,7 @@ impl TranscodeOptions {
self.output_ffmpeg_format(), self.output_ffmpeg_format(),
output_path, output_path,
], ],
timeout,
), ),
TranscodeOutputOptions::Video { TranscodeOutputOptions::Video {
video_codec, video_codec,
@ -264,6 +266,7 @@ impl TranscodeOptions {
self.output_ffmpeg_format(), self.output_ffmpeg_format(),
output_path, output_path,
], ],
timeout,
), ),
} }
} }
@ -429,8 +432,9 @@ const FORMAT_MAPPINGS: &[(&str, VideoFormat)] = &[
pub(crate) async fn input_type_bytes( pub(crate) async fn input_type_bytes(
input: Bytes, input: Bytes,
timeout: u64,
) -> Result<Option<(Details, ValidInputType)>, FfMpegError> { ) -> Result<Option<(Details, ValidInputType)>, FfMpegError> {
if let Some(details) = details_bytes(input).await? { if let Some(details) = details_bytes(input, timeout).await? {
let input_type = details let input_type = details
.validate_input() .validate_input()
.map_err(FfMpegError::ValidateDetails)?; .map_err(FfMpegError::ValidateDetails)?;
@ -444,33 +448,43 @@ pub(crate) async fn input_type_bytes(
pub(crate) async fn details_store<S: Store>( pub(crate) async fn details_store<S: Store>(
store: &S, store: &S,
identifier: &S::Identifier, identifier: &S::Identifier,
timeout: u64,
) -> Result<Option<Details>, FfMpegError> { ) -> Result<Option<Details>, FfMpegError> {
details_file(move |mut tmp_one| async move { details_file(
let stream = store move |mut tmp_one| async move {
.to_stream(identifier, None, None) let stream = store
.await .to_stream(identifier, None, None)
.map_err(FfMpegError::Store)?; .await
tmp_one .map_err(FfMpegError::Store)?;
.write_from_stream(stream) tmp_one
.await .write_from_stream(stream)
.map_err(FfMpegError::Write)?; .await
Ok(tmp_one) .map_err(FfMpegError::Write)?;
}) Ok(tmp_one)
},
timeout,
)
.await .await
} }
pub(crate) async fn details_bytes(input: Bytes) -> Result<Option<Details>, FfMpegError> { pub(crate) async fn details_bytes(
details_file(move |mut tmp_one| async move { input: Bytes,
tmp_one timeout: u64,
.write_from_bytes(input) ) -> Result<Option<Details>, FfMpegError> {
.await details_file(
.map_err(FfMpegError::Write)?; move |mut tmp_one| async move {
Ok(tmp_one) tmp_one
}) .write_from_bytes(input)
.await
.map_err(FfMpegError::Write)?;
Ok(tmp_one)
},
timeout,
)
.await .await
} }
async fn alpha_pixel_formats() -> Result<HashSet<String>, FfMpegError> { async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegError> {
let process = Process::run( let process = Process::run(
"ffprobe", "ffprobe",
&[ &[
@ -483,6 +497,7 @@ async fn alpha_pixel_formats() -> Result<HashSet<String>, FfMpegError> {
"-print_format", "-print_format",
"json", "json",
], ],
timeout,
) )
.map_err(FfMpegError::Process)?; .map_err(FfMpegError::Process)?;
@ -535,7 +550,7 @@ struct Format {
} }
#[tracing::instrument(skip(f))] #[tracing::instrument(skip(f))]
async fn details_file<F, Fut>(f: F) -> Result<Option<Details>, FfMpegError> async fn details_file<F, Fut>(f: F, timeout: u64) -> Result<Option<Details>, FfMpegError>
where where
F: FnOnce(crate::file::File) -> Fut, F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>, Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
@ -568,6 +583,7 @@ where
"json", "json",
input_file_str, input_file_str,
], ],
timeout,
) )
.map_err(FfMpegError::Process)?; .map_err(FfMpegError::Process)?;
@ -640,7 +656,7 @@ fn parse_details_inner(
})) }))
} }
async fn pixel_format(input_file: &str) -> Result<String, FfMpegError> { async fn pixel_format(input_file: &str, timeout: u64) -> Result<String, FfMpegError> {
let process = Process::run( let process = Process::run(
"ffprobe", "ffprobe",
&[ &[
@ -654,6 +670,7 @@ async fn pixel_format(input_file: &str) -> Result<String, FfMpegError> {
"compact=p=0:nk=1", "compact=p=0:nk=1",
input_file, input_file,
], ],
timeout,
) )
.map_err(FfMpegError::Process)?; .map_err(FfMpegError::Process)?;
@ -675,6 +692,7 @@ async fn pixel_format(input_file: &str) -> Result<String, FfMpegError> {
pub(crate) async fn transcode_bytes( pub(crate) async fn transcode_bytes(
input: Bytes, input: Bytes,
transcode_options: TranscodeOptions, transcode_options: TranscodeOptions,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, FfMpegError> { ) -> Result<impl AsyncRead + Unpin, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(Some(transcode_options.input_file_extension())); let input_file = crate::tmp_file::tmp_file(Some(transcode_options.input_file_extension()));
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
@ -700,12 +718,12 @@ pub(crate) async fn transcode_bytes(
let alpha = if transcode_options.supports_alpha() { let alpha = if transcode_options.supports_alpha() {
static ALPHA_PIXEL_FORMATS: OnceCell<HashSet<String>> = OnceCell::new(); static ALPHA_PIXEL_FORMATS: OnceCell<HashSet<String>> = OnceCell::new();
let format = pixel_format(input_file_str).await?; let format = pixel_format(input_file_str, timeout).await?;
match ALPHA_PIXEL_FORMATS.get() { match ALPHA_PIXEL_FORMATS.get() {
Some(alpha_pixel_formats) => alpha_pixel_formats.contains(&format), Some(alpha_pixel_formats) => alpha_pixel_formats.contains(&format),
None => { None => {
let pixel_formats = alpha_pixel_formats().await?; let pixel_formats = alpha_pixel_formats(timeout).await?;
let alpha = pixel_formats.contains(&format); let alpha = pixel_formats.contains(&format);
let _ = ALPHA_PIXEL_FORMATS.set(pixel_formats); let _ = ALPHA_PIXEL_FORMATS.set(pixel_formats);
alpha alpha
@ -716,7 +734,7 @@ pub(crate) async fn transcode_bytes(
}; };
let process = transcode_options let process = transcode_options
.execute(input_file_str, output_file_str, alpha) .execute(input_file_str, output_file_str, alpha, timeout)
.map_err(FfMpegError::Process)?; .map_err(FfMpegError::Process)?;
process.wait().await.map_err(FfMpegError::Process)?; process.wait().await.map_err(FfMpegError::Process)?;
@ -743,6 +761,7 @@ pub(crate) async fn thumbnail<S: Store>(
from: S::Identifier, from: S::Identifier,
input_format: VideoFormat, input_format: VideoFormat,
format: ThumbnailFormat, format: ThumbnailFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, FfMpegError> { ) -> Result<impl AsyncRead + Unpin, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension())); let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension()));
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
@ -785,6 +804,7 @@ pub(crate) async fn thumbnail<S: Store>(
format.as_ffmpeg_format(), format.as_ffmpeg_format(),
output_file_str, output_file_str,
], ],
timeout,
) )
.map_err(FfMpegError::Process)?; .map_err(FfMpegError::Process)?;

View file

@ -23,6 +23,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<VideoFormat>, input_format: Option<VideoFormat>,
thumbnail_format: Option<ThumbnailFormat>, thumbnail_format: Option<ThumbnailFormat>,
timeout: u64,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let process_fut = process( let process_fut = process(
@ -34,6 +35,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
thumbnail_args, thumbnail_args,
input_format, input_format,
thumbnail_format, thumbnail_format,
timeout,
hash.clone(), hash.clone(),
); );
@ -54,6 +56,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<VideoFormat>, input_format: Option<VideoFormat>,
thumbnail_format: Option<ThumbnailFormat>, thumbnail_format: Option<ThumbnailFormat>,
timeout: u64,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let permit = crate::PROCESS_SEMAPHORE.acquire().await; let permit = crate::PROCESS_SEMAPHORE.acquire().await;
@ -75,6 +78,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
identifier, identifier,
input_format.unwrap_or(VideoFormat::Mp4), input_format.unwrap_or(VideoFormat::Mp4),
thumbnail_format, thumbnail_format,
timeout,
) )
.await?; .await?;
let motion_identifier = store let motion_identifier = store
@ -87,8 +91,13 @@ async fn process<R: FullRepo, S: Store + 'static>(
motion_identifier motion_identifier
}; };
let mut processed_reader = let mut processed_reader = crate::magick::process_image_store_read(
crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; store.clone(),
identifier,
thumbnail_args,
format,
timeout,
)?;
let mut vec = Vec::new(); let mut vec = Vec::new();
processed_reader processed_reader
@ -99,7 +108,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
drop(permit); drop(permit);
let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; let details = Details::from_bytes(bytes.clone(), format.as_hint(), timeout).await?;
let identifier = store let identifier = store
.save_bytes(bytes.clone(), details.content_type()) .save_bytes(bytes.clone(), details.content_type())

View file

@ -48,6 +48,7 @@ pub(crate) async fn ingest<R, S>(
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static, stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
should_validate: bool, should_validate: bool,
timeout: u64,
) -> Result<Session<R, S>, Error> ) -> Result<Session<R, S>, Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,
@ -65,8 +66,12 @@ where
if let Some(format) = input_type.to_format() { if let Some(format) = input_type.to_format() {
let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?; let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?;
let processed_reader = let processed_reader = crate::magick::process_image_async_read(
crate::magick::process_image_async_read(validated_reader, magick_args, format)?; validated_reader,
magick_args,
format,
timeout,
)?;
Either::left(processed_reader) Either::left(processed_reader)
} else { } else {

View file

@ -122,7 +122,13 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
} else { } else {
tracing::debug!("generating new details from {:?}", identifier); tracing::debug!("generating new details from {:?}", identifier);
let hint = details_hint(alias); let hint = details_hint(alias);
let new_details = Details::from_store(store.clone(), identifier.clone(), hint).await?; let new_details = Details::from_store(
store.clone(),
identifier.clone(),
hint,
CONFIG.media.process_timeout,
)
.await?;
tracing::debug!("storing details for {:?}", identifier); tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?; repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored"); tracing::debug!("stored");
@ -164,8 +170,18 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
Box::pin( Box::pin(
async move { ingest::ingest(&**repo, &**store, stream, None, true).await } async move {
.instrument(span), ingest::ingest(
&**repo,
&**store,
stream,
None,
true,
CONFIG.media.process_timeout,
)
.await
}
.instrument(span),
) )
})), })),
) )
@ -217,6 +233,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
stream, stream,
Some(Alias::from_existing(&filename)), Some(Alias::from_existing(&filename)),
!CONFIG.media.skip_validate_imports, !CONFIG.media.skip_validate_imports,
CONFIG.media.process_timeout,
) )
.await .await
} }
@ -472,7 +489,15 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let mut session = ingest::ingest(&repo, &store, stream, None, true).await?; let mut session = ingest::ingest(
&repo,
&store,
stream,
None,
true,
CONFIG.media.process_timeout,
)
.await?;
let alias = session.alias().expect("alias should exist").to_owned(); let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?; let delete_token = session.delete_token().await?;
@ -658,6 +683,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
(**store).clone(), (**store).clone(),
identifier.clone(), identifier.clone(),
Some(ValidInputType::from_format(format)), Some(ValidInputType::from_format(format)),
CONFIG.media.process_timeout,
) )
.await?; .await?;
tracing::debug!("storing details for {:?}", identifier); tracing::debug!("storing details for {:?}", identifier);
@ -680,6 +706,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args, thumbnail_args,
original_details.to_input_format(), original_details.to_input_format(),
None, None,
CONFIG.media.process_timeout,
hash, hash,
) )
.await?; .await?;
@ -753,6 +780,7 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
(**store).clone(), (**store).clone(),
identifier.clone(), identifier.clone(),
Some(ValidInputType::from_format(format)), Some(ValidInputType::from_format(format)),
CONFIG.media.process_timeout,
) )
.await?; .await?;
tracing::debug!("storing details for {:?}", identifier); tracing::debug!("storing details for {:?}", identifier);

View file

@ -206,6 +206,7 @@ pub(crate) struct Details {
pub(crate) fn convert_bytes_read( pub(crate) fn convert_bytes_read(
input: Bytes, input: Bytes,
format: ImageFormat, format: ImageFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<impl AsyncRead + Unpin, MagickError> {
let process = Process::run( let process = Process::run(
"magick", "magick",
@ -216,6 +217,7 @@ pub(crate) fn convert_bytes_read(
"-strip", "-strip",
format!("{}:-", format.as_magick_format()).as_str(), format!("{}:-", format.as_magick_format()).as_str(),
], ],
timeout,
) )
.map_err(MagickError::Process)?; .map_err(MagickError::Process)?;
@ -226,6 +228,7 @@ pub(crate) fn convert_bytes_read(
pub(crate) async fn details_bytes( pub(crate) async fn details_bytes(
input: Bytes, input: Bytes,
hint: Option<ValidInputType>, hint: Option<ValidInputType>,
timeout: u64,
) -> Result<Details, MagickError> { ) -> Result<Details, MagickError> {
if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { if let Some(hint) = hint.and_then(|hint| hint.video_hint()) {
let input_file = crate::tmp_file::tmp_file(Some(hint)); let input_file = crate::tmp_file::tmp_file(Some(hint));
@ -243,7 +246,7 @@ pub(crate) async fn details_bytes(
.map_err(MagickError::Write)?; .map_err(MagickError::Write)?;
tmp_one.close().await.map_err(MagickError::CloseFile)?; tmp_one.close().await.map_err(MagickError::CloseFile)?;
return details_file(input_file_str).await; return details_file(input_file_str, timeout).await;
} }
let last_arg = if let Some(expected_format) = hint { let last_arg = if let Some(expected_format) = hint {
@ -252,7 +255,7 @@ pub(crate) async fn details_bytes(
"-".to_owned() "-".to_owned()
}; };
let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"]) let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"], timeout)
.map_err(MagickError::Process)?; .map_err(MagickError::Process)?;
let mut reader = process.bytes_read(input); let mut reader = process.bytes_read(input);
@ -295,6 +298,7 @@ pub(crate) async fn details_store<S: Store + 'static>(
store: S, store: S,
identifier: S::Identifier, identifier: S::Identifier,
hint: Option<ValidInputType>, hint: Option<ValidInputType>,
timeout: u64,
) -> Result<Details, MagickError> { ) -> Result<Details, MagickError> {
if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { if let Some(hint) = hint.and_then(|hint| hint.video_hint()) {
let input_file = crate::tmp_file::tmp_file(Some(hint)); let input_file = crate::tmp_file::tmp_file(Some(hint));
@ -316,7 +320,7 @@ pub(crate) async fn details_store<S: Store + 'static>(
.map_err(MagickError::Write)?; .map_err(MagickError::Write)?;
tmp_one.close().await.map_err(MagickError::CloseFile)?; tmp_one.close().await.map_err(MagickError::CloseFile)?;
return details_file(input_file_str).await; return details_file(input_file_str, timeout).await;
} }
let last_arg = if let Some(expected_format) = hint { let last_arg = if let Some(expected_format) = hint {
@ -325,7 +329,7 @@ pub(crate) async fn details_store<S: Store + 'static>(
"-".to_owned() "-".to_owned()
}; };
let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"]) let process = Process::run("magick", &["convert", "-ping", &last_arg, "JSON:"], timeout)
.map_err(MagickError::Process)?; .map_err(MagickError::Process)?;
let mut reader = process.store_read(store, identifier); let mut reader = process.store_read(store, identifier);
@ -347,8 +351,8 @@ pub(crate) async fn details_store<S: Store + 'static>(
} }
#[tracing::instrument] #[tracing::instrument]
pub(crate) async fn details_file(path_str: &str) -> Result<Details, MagickError> { pub(crate) async fn details_file(path_str: &str, timeout: u64) -> Result<Details, MagickError> {
let process = Process::run("magick", &["convert", "-ping", path_str, "JSON:"]) let process = Process::run("magick", &["convert", "-ping", path_str, "JSON:"], timeout)
.map_err(MagickError::Process)?; .map_err(MagickError::Process)?;
let mut reader = process.read(); let mut reader = process.read();
@ -438,15 +442,20 @@ fn parse_details(details_output: Vec<DetailsOutput>) -> Result<Details, ParseDet
pub(crate) async fn input_type_bytes( pub(crate) async fn input_type_bytes(
input: Bytes, input: Bytes,
timeout: u64,
) -> Result<(Details, ValidInputType), MagickError> { ) -> Result<(Details, ValidInputType), MagickError> {
let details = details_bytes(input, None).await?; let details = details_bytes(input, None, timeout).await?;
let input_type = details let input_type = details
.validate_input() .validate_input()
.map_err(MagickError::ValidateDetails)?; .map_err(MagickError::ValidateDetails)?;
Ok((details, input_type)) Ok((details, input_type))
} }
fn process_image(process_args: Vec<String>, format: ImageFormat) -> Result<Process, ProcessError> { fn process_image(
process_args: Vec<String>,
format: ImageFormat,
timeout: u64,
) -> Result<Process, ProcessError> {
let command = "magick"; let command = "magick";
let convert_args = ["convert", "-"]; let convert_args = ["convert", "-"];
let last_arg = format!("{}:-", format.as_magick_format()); let last_arg = format!("{}:-", format.as_magick_format());
@ -456,7 +465,7 @@ fn process_image(process_args: Vec<String>, format: ImageFormat) -> Result<Proce
args.extend(process_args.iter().map(|s| s.as_str())); args.extend(process_args.iter().map(|s| s.as_str()));
args.push(&last_arg); args.push(&last_arg);
Process::run(command, &args) Process::run(command, &args, timeout)
} }
pub(crate) fn process_image_store_read<S: Store + 'static>( pub(crate) fn process_image_store_read<S: Store + 'static>(
@ -464,8 +473,9 @@ pub(crate) fn process_image_store_read<S: Store + 'static>(
identifier: S::Identifier, identifier: S::Identifier,
args: Vec<String>, args: Vec<String>,
format: ImageFormat, format: ImageFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<impl AsyncRead + Unpin, MagickError> {
Ok(process_image(args, format) Ok(process_image(args, format, timeout)
.map_err(MagickError::Process)? .map_err(MagickError::Process)?
.store_read(store, identifier)) .store_read(store, identifier))
} }
@ -474,8 +484,9 @@ pub(crate) fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
async_read: A, async_read: A,
args: Vec<String>, args: Vec<String>,
format: ImageFormat, format: ImageFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<impl AsyncRead + Unpin, MagickError> {
Ok(process_image(args, format) Ok(process_image(args, format, timeout)
.map_err(MagickError::Process)? .map_err(MagickError::Process)?
.pipe_async_read(async_read)) .pipe_async_read(async_read))
} }

View file

@ -6,10 +6,11 @@ use std::{
pin::Pin, pin::Pin,
process::{ExitStatus, Stdio}, process::{ExitStatus, Stdio},
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, ChildStdin, Command}, process::{Child, ChildStdin, ChildStdout, Command},
sync::oneshot::{channel, Receiver}, sync::oneshot::{channel, Receiver},
}; };
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
@ -19,6 +20,7 @@ struct StatusError(ExitStatus);
pub(crate) struct Process { pub(crate) struct Process {
child: Child, child: Child,
timeout: Duration,
} }
impl std::fmt::Debug for Process { impl std::fmt::Debug for Process {
@ -31,15 +33,14 @@ struct DropHandle {
inner: JoinHandle<()>, inner: JoinHandle<()>,
} }
pin_project_lite::pin_project! { pub(crate) struct ProcessRead<I> {
struct ProcessRead<I> { inner: I,
#[pin] err_recv: Receiver<std::io::Error>,
inner: I, err_closed: bool,
err_recv: Receiver<std::io::Error>, #[allow(dead_code)]
err_closed: bool, handle: DropHandle,
handle: DropHandle, eof: bool,
eof: bool, sleep: Pin<Box<actix_rt::time::Sleep>>,
}
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -53,6 +54,9 @@ pub(crate) enum ProcessError {
#[error("Reached process spawn limit")] #[error("Reached process spawn limit")]
LimitReached, LimitReached,
#[error("Process timed out")]
Timeout,
#[error("Failed with status {0}")] #[error("Failed with status {0}")]
Status(ExitStatus), Status(ExitStatus),
@ -61,9 +65,9 @@ pub(crate) enum ProcessError {
} }
impl Process { impl Process {
pub(crate) fn run(command: &str, args: &[&str]) -> Result<Self, ProcessError> { pub(crate) fn run(command: &str, args: &[&str], timeout: u64) -> Result<Self, ProcessError> {
let res = tracing::trace_span!(parent: None, "Create command") let res = tracing::trace_span!(parent: None, "Create command")
.in_scope(|| Self::spawn(Command::new(command).args(args))); .in_scope(|| Self::spawn(Command::new(command).args(args), timeout));
match res { match res {
Ok(this) => Ok(this), Ok(this) => Ok(this),
@ -78,43 +82,49 @@ impl Process {
} }
} }
fn spawn(cmd: &mut Command) -> std::io::Result<Self> { fn spawn(cmd: &mut Command, timeout: u64) -> std::io::Result<Self> {
let timeout = Duration::from_secs(timeout);
tracing::trace_span!(parent: None, "Spawn command").in_scope(|| { tracing::trace_span!(parent: None, "Spawn command").in_scope(|| {
let cmd = cmd let cmd = cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.kill_on_drop(true); .kill_on_drop(true);
cmd.spawn().map(|child| Process { child }) cmd.spawn().map(|child| Process { child, timeout })
}) })
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub(crate) async fn wait(mut self) -> Result<(), ProcessError> { pub(crate) async fn wait(self) -> Result<(), ProcessError> {
let res = self.child.wait().await; let Process { mut child, timeout } = self;
match res { match actix_rt::time::timeout(timeout, child.wait()).await {
Ok(status) if status.success() => Ok(()), Ok(Ok(status)) if status.success() => Ok(()),
Ok(status) => Err(ProcessError::Status(status)), Ok(Ok(status)) => Err(ProcessError::Status(status)),
Err(e) => Err(ProcessError::Other(e)), Ok(Err(e)) => Err(ProcessError::Other(e)),
Err(_) => {
child.kill().await.map_err(ProcessError::Other)?;
return Err(ProcessError::Timeout);
}
} }
} }
pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin { pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| { self.spawn_fn(move |mut stdin| {
let mut input = input; let mut input = input;
async move { stdin.write_all_buf(&mut input).await } async move { stdin.write_all_buf(&mut input).await }
}) })
} }
pub(crate) fn read(self) -> impl AsyncRead + Unpin { pub(crate) fn read(self) -> ProcessRead<ChildStdout> {
self.spawn_fn(|_| async { Ok(()) }) self.spawn_fn(|_| async { Ok(()) })
} }
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>( pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
self, self,
mut async_read: A, mut async_read: A,
) -> impl AsyncRead + Unpin { ) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| async move { self.spawn_fn(move |mut stdin| async move {
tokio::io::copy(&mut async_read, &mut stdin) tokio::io::copy(&mut async_read, &mut stdin)
.await .await
@ -126,7 +136,7 @@ impl Process {
self, self,
store: S, store: S,
identifier: S::Identifier, identifier: S::Identifier,
) -> impl AsyncRead + Unpin { ) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| { self.spawn_fn(move |mut stdin| {
let store = store; let store = store;
let identifier = identifier; let identifier = identifier;
@ -138,13 +148,15 @@ impl Process {
#[allow(unknown_lints)] #[allow(unknown_lints)]
#[allow(clippy::let_with_type_underscore)] #[allow(clippy::let_with_type_underscore)]
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
fn spawn_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin fn spawn_fn<F, Fut>(self, f: F) -> ProcessRead<ChildStdout>
where where
F: FnOnce(ChildStdin) -> Fut + 'static, F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>, Fut: Future<Output = std::io::Result<()>>,
{ {
let stdin = self.child.stdin.take().expect("stdin exists"); let Process { mut child, timeout } = self;
let stdout = self.child.stdout.take().expect("stdout exists");
let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>); .in_scope(channel::<std::io::Error>);
@ -152,82 +164,84 @@ impl Process {
let span = tracing::info_span!(parent: None, "Background process task"); let span = tracing::info_span!(parent: None, "Background process task");
span.follows_from(Span::current()); span.follows_from(Span::current());
let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn( actix_rt::spawn(
async move { async move {
if let Err(e) = (f)(stdin).await { let child_fut = async {
let _ = tx.send(e); (f)(stdin).await?;
return;
}
match child.wait().await { child.wait().await
Ok(status) => { };
if !status.success() {
let _ = tx.send(std::io::Error::new( let err = match actix_rt::time::timeout(timeout, child_fut).await {
std::io::ErrorKind::Other, Ok(Ok(status)) if status.success() => return,
StatusError(status), Ok(Ok(status)) => std::io::Error::new(
)); std::io::ErrorKind::Other,
} ProcessError::Status(status),
} ),
Err(e) => { Ok(Err(e)) => e,
let _ = tx.send(e); Err(_) => std::io::ErrorKind::TimedOut.into(),
} };
}
let _ = tx.send(err);
let _ = child.kill().await;
} }
.instrument(span), .instrument(span),
) )
}); });
let sleep = Box::pin(actix_rt::time::sleep(timeout));
ProcessRead { ProcessRead {
inner: stdout, inner: stdout,
err_recv: rx, err_recv: rx,
err_closed: false, err_closed: false,
handle: DropHandle { inner: handle }, handle: DropHandle { inner: handle },
eof: false, eof: false,
sleep,
} }
} }
} }
impl<I> AsyncRead for ProcessRead<I> impl<I> AsyncRead for ProcessRead<I>
where where
I: AsyncRead, I: AsyncRead + Unpin,
{ {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
let this = self.as_mut().project(); if !self.err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) {
let err_recv = this.err_recv; self.err_closed = true;
let err_closed = this.err_closed;
let eof = this.eof;
let inner = this.inner;
if !*err_closed {
if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
*err_closed = true;
if let Ok(err) = res { if let Ok(err) = res {
return Poll::Ready(Err(err)); return Poll::Ready(Err(err));
} }
if *eof { if self.eof {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
} }
if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) {
self.err_closed = true;
return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into()));
}
} }
if !*eof { if !self.eof {
let before_size = buf.filled().len(); let before_size = buf.filled().len();
return match inner.poll_read(cx, buf) { return match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
if buf.filled().len() == before_size { if buf.filled().len() == before_size {
*eof = true; self.eof = true;
if !*err_closed { if !self.err_closed {
// reached end of stream & haven't received process signal // reached end of stream & haven't received process signal
return Poll::Pending; return Poll::Pending;
} }
@ -236,7 +250,7 @@ where
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
*eof = true; self.eof = true;
Poll::Ready(Err(e)) Poll::Ready(Err(e))
} }
@ -244,7 +258,7 @@ where
}; };
} }
if *err_closed && *eof { if self.err_closed && self.eof {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }

View file

@ -35,6 +35,7 @@ where
Serde::into_inner(upload_id), Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner), declared_alias.map(Serde::into_inner),
should_validate, should_validate,
crate::CONFIG.media.process_timeout,
) )
.await? .await?
} }
@ -51,6 +52,7 @@ where
Serde::into_inner(source), Serde::into_inner(source),
process_path, process_path,
process_args, process_args,
crate::CONFIG.media.process_timeout,
) )
.await? .await?
} }
@ -72,6 +74,7 @@ async fn process_ingest<R, S>(
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
should_validate: bool, should_validate: bool,
timeout: u64,
) -> Result<(), Error> ) -> Result<(), Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,
@ -90,9 +93,15 @@ where
.await? .await?
.map_err(Error::from); .map_err(Error::from);
let session = let session = crate::ingest::ingest(
crate::ingest::ingest(&repo, &store2, stream, declared_alias, should_validate) &repo,
.await?; &store2,
stream,
declared_alias,
should_validate,
timeout,
)
.await?;
let token = session.delete_token().await?; let token = session.delete_token().await?;
@ -134,6 +143,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
source: Alias, source: Alias,
process_path: PathBuf, process_path: PathBuf,
process_args: Vec<String>, process_args: Vec<String>,
timeout: u64,
) -> Result<(), Error> { ) -> Result<(), Error> {
let Some(hash) = repo.hash(&source).await? else { let Some(hash) = repo.hash(&source).await? else {
// Nothing to do // Nothing to do
@ -160,6 +170,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
process_args, process_args,
original_details.to_input_format(), original_details.to_input_format(),
None, None,
timeout,
hash, hash,
) )
.await?; .await?;

View file

@ -41,12 +41,13 @@ pub(crate) async fn validate_bytes(
media: &MediaConfiguration, media: &MediaConfiguration,
validate: bool, validate: bool,
) -> Result<(ValidInputType, impl AsyncRead + Unpin), Error> { ) -> Result<(ValidInputType, impl AsyncRead + Unpin), Error> {
let (details, input_type) = let (details, input_type) = if let Some(tup) =
if let Some(tup) = crate::ffmpeg::input_type_bytes(bytes.clone()).await? { crate::ffmpeg::input_type_bytes(bytes.clone(), media.process_timeout).await?
tup {
} else { tup
crate::magick::input_type_bytes(bytes.clone()).await? } else {
}; crate::magick::input_type_bytes(bytes.clone(), media.process_timeout).await?
};
if !validate { if !validate {
return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes)))); return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes))));
@ -63,7 +64,12 @@ pub(crate) async fn validate_bytes(
Ok(( Ok((
transcode_options.output_type(), transcode_options.output_type(),
Either::right(Either::left(Either::left( Either::right(Either::left(Either::left(
crate::ffmpeg::transcode_bytes(bytes, transcode_options).await?, crate::ffmpeg::transcode_bytes(
bytes,
transcode_options,
media.process_timeout,
)
.await?,
))), ))),
)) ))
} else { } else {
@ -71,6 +77,7 @@ pub(crate) async fn validate_bytes(
transcode_options.output_type(), transcode_options.output_type(),
Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read( Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read(
bytes, bytes,
media.process_timeout,
)?)), )?)),
)) ))
} }
@ -78,21 +85,25 @@ pub(crate) async fn validate_bytes(
(FileFormat::Image(image_format), Some(format)) if image_format != format => Ok(( (FileFormat::Image(image_format), Some(format)) if image_format != format => Ok((
ValidInputType::from_format(format), ValidInputType::from_format(format),
Either::right(Either::left(Either::right( Either::right(Either::left(Either::right(
crate::magick::convert_bytes_read(bytes, format)?, crate::magick::convert_bytes_read(bytes, format, media.process_timeout)?,
))), ))),
)), )),
(FileFormat::Image(ImageFormat::Webp), _) => Ok(( (FileFormat::Image(ImageFormat::Webp), _) => Ok((
ValidInputType::Webp, ValidInputType::Webp,
Either::right(Either::left(Either::right( Either::right(Either::left(Either::right(
crate::magick::convert_bytes_read(bytes, ImageFormat::Webp)?, crate::magick::convert_bytes_read(bytes, ImageFormat::Webp, media.process_timeout)?,
))), ))),
)), )),
(FileFormat::Image(image_format), _) => { (FileFormat::Image(image_format), _) => {
if crate::exiftool::needs_reorienting(bytes.clone()).await? { if crate::exiftool::needs_reorienting(bytes.clone(), media.process_timeout).await? {
Ok(( Ok((
ValidInputType::from_format(image_format), ValidInputType::from_format(image_format),
Either::right(Either::left(Either::right( Either::right(Either::left(Either::right(
crate::magick::convert_bytes_read(bytes, image_format)?, crate::magick::convert_bytes_read(
bytes,
image_format,
media.process_timeout,
)?,
))), ))),
)) ))
} else { } else {
@ -100,6 +111,7 @@ pub(crate) async fn validate_bytes(
ValidInputType::from_format(image_format), ValidInputType::from_format(image_format),
Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read( Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read(
bytes, bytes,
media.process_timeout,
)?)), )?)),
)) ))
} }