From 5624671cbf8076bf2011fc9516a3c567949b1087 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 22 Dec 2023 20:52:58 -0600 Subject: [PATCH] Rework ProcessRead to require a closure for using the internal AsyncRead --- src/discover/exiftool.rs | 12 +- src/discover/ffmpeg.rs | 28 ++--- src/discover/magick.rs | 29 ++--- src/error.rs | 5 + src/exiftool.rs | 16 +-- src/generate.rs | 14 +-- src/generate/ffmpeg.rs | 22 +++- src/generate/magick.rs | 16 +-- src/ingest.rs | 27 ++-- src/magick.rs | 50 ++++++-- src/process.rs | 265 ++++++++++++++------------------------- src/validate.rs | 36 +++--- src/validate/exiftool.rs | 12 +- src/validate/ffmpeg.rs | 17 ++- src/validate/magick.rs | 15 ++- 15 files changed, 265 insertions(+), 299 deletions(-) diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 1e7a514..0ef2d72 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -42,14 +42,10 @@ pub(super) async fn check_reorient( #[tracing::instrument(level = "trace", skip_all)] async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { - let process = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?; - let mut reader = process.bytes_read(input); - - let mut buf = String::new(); - reader - .read_to_string(&mut buf) - .await - .map_err(ExifError::Read)?; + let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? + .bytes_read(input) + .to_string() + .await?; Ok(!buf.is_empty()) } diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 54c9676..df4f237 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -212,7 +212,7 @@ where let tmp_one = (f)(tmp_one).await?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?; - let process = Process::run( + let output = Process::run( "ffprobe", &[ "-v".as_ref(), @@ -228,14 +228,10 @@ where ], &[], timeout, - )?; - - let mut output = Vec::new(); - process - .read() - .read_to_end(&mut output) - .await - .map_err(FfMpegError::Read)?; + )? + .read() + .to_vec() + .await?; drop(input_file); @@ -262,7 +258,7 @@ where #[tracing::instrument(level = "debug", skip_all)] async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegError> { - let process = Process::run( + let output = Process::run( "ffprobe", &[ "-v", @@ -276,14 +272,10 @@ async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegErro ], &[], timeout, - )?; - - let mut output = Vec::new(); - process - .read() - .read_to_end(&mut output) - .await - .map_err(FfMpegError::Read)?; + )? + .read() + .to_vec() + .await?; let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; diff --git a/src/discover/magick.rs b/src/discover/magick.rs index accc09e..221777f 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -118,7 +118,7 @@ where let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; - let process = Process::run( + let output = Process::run( "magick", &[ "convert".as_ref(), @@ -128,14 +128,10 @@ where ], &envs, timeout, - )?; - - let mut output = String::new(); - process - .read() - .read_to_string(&mut output) - .await - .map_err(MagickError::Read)?; + )? + .read() + .to_string() + .await?; drop(input_file); drop(temporary_path); @@ -185,7 +181,7 @@ where let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; - let process = Process::run( + let output = Process::run( "magick", &[ "convert".as_ref(), @@ -195,16 +191,13 @@ where ], &envs, timeout, - )?; - - let mut output = Vec::new(); - process - .read() - .read_to_end(&mut output) - .await - .map_err(MagickError::Read)?; + )? + .read() + .to_vec() + .await?; drop(input_file); + drop(temporary_path); if output.is_empty() { return Err(MagickError::Empty); diff --git a/src/error.rs b/src/error.rs index b7cae65..7a77f5b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -96,6 +96,9 @@ pub(crate) enum UploadError { #[error("Error in exiftool")] Exiftool(#[from] crate::exiftool::ExifError), + #[error("Error in process")] + Process(#[from] crate::process::ProcessError), + #[error("Error building reqwest client")] BuildClient(#[source] reqwest::Error), @@ -172,6 +175,7 @@ impl UploadError { Self::Ffmpeg(e) => e.error_code(), Self::Magick(e) => e.error_code(), Self::Exiftool(e) => e.error_code(), + Self::Process(e) => e.error_code(), Self::BuildClient(_) | Self::RequestMiddleware(_) | Self::Request(_) => { ErrorCode::HTTP_CLIENT_ERROR } @@ -246,6 +250,7 @@ impl ResponseError for Error { Some(UploadError::Magick(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Exiftool(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, + Some(UploadError::Process(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND, Some(UploadError::Ffmpeg(e)) if e.is_not_found() => StatusCode::NOT_FOUND, Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN, diff --git a/src/exiftool.rs b/src/exiftool.rs index 68f2789..a2d8ebd 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,6 +1,6 @@ use crate::{ error_code::ErrorCode, - process::{Process, ProcessError}, + process::{Process, ProcessError, ProcessRead}, }; use actix_web::web::Bytes; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -45,14 +45,10 @@ impl ExifError { #[tracing::instrument(level = "trace", skip(input))] pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { - let process = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?; - let mut reader = process.bytes_read(input); - - let mut buf = String::new(); - reader - .read_to_string(&mut buf) - .await - .map_err(ExifError::Read)?; + let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? + .bytes_read(input) + .to_string() + .await?; Ok(!buf.is_empty()) } @@ -61,7 +57,7 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result Result { +) -> Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; Ok(process.bytes_read(input)) diff --git a/src/generate.rs b/src/generate.rs index 27cb854..0d83862 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -135,7 +135,7 @@ async fn process( ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), }; - let mut processed_reader = crate::magick::process_image_store_read( + let vec = crate::magick::process_image_store_read( tmp_dir, store, &identifier, @@ -145,13 +145,11 @@ async fn process( quality, config.media.process_timeout, ) + .await? + .to_vec() + .instrument(tracing::info_span!("Reading processed image to vec")) .await?; - let mut vec = Vec::new(); - processed_reader - .read_to_end(&mut vec) - .instrument(tracing::info_span!("Reading processed image to vec")) - .await?; let bytes = Bytes::from(vec); drop(permit); @@ -254,7 +252,9 @@ where (reader, thumbnail_format.media_type()) }; - let motion_identifier = store.save_async_read(reader, media_type).await?; + let motion_identifier = reader + .with_stdout(|mut stdout| async { store.save_async_read(stdout, media_type).await }) + .await??; repo.relate_motion_identifier(hash, &motion_identifier) .await?; diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index 4c0fc9b..8765d60 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -1,8 +1,14 @@ use std::sync::Arc; +use uuid::Uuid; + use crate::{ - ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead, - store::Store, tmp_file::TmpDir, + ffmpeg::FfMpegError, + formats::InternalVideoFormat, + process::{Process, ProcessRead}, + read::BoxRead, + store::Store, + tmp_file::TmpDir, }; #[derive(Clone, Copy, Debug)] @@ -53,7 +59,7 @@ pub(super) async fn thumbnail( input_format: InternalVideoFormat, format: ThumbnailFormat, timeout: u64, -) -> Result, FfMpegError> { +) -> Result { let input_file = tmp_dir.tmp_file(Some(input_format.file_extension())); crate::store::file_store::safe_create_parent(&input_file) .await @@ -108,7 +114,13 @@ pub(super) async fn thumbnail( .await .map_err(FfMpegError::ReadFile)?; let reader = tokio_util::io::StreamReader::new(stream); - let clean_reader = output_file.reader(reader); - Ok(Box::pin(clean_reader)) + let reader = ProcessRead::new( + Box::pin(reader), + Arc::from(String::from("ffmpeg")), + Uuid::now_v7(), + ) + .add_extras(output_file); + + Ok(reader) } diff --git a/src/generate/magick.rs b/src/generate/magick.rs index e0a3236..e5aecf1 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -3,7 +3,7 @@ use std::{ffi::OsStr, sync::Arc}; use crate::{ formats::ProcessableFormat, magick::{MagickError, MAGICK_TEMPORARY_PATH}, - process::Process, + process::{Process, ProcessRead}, read::BoxRead, store::Store, tmp_file::TmpDir, @@ -16,7 +16,7 @@ async fn thumbnail_animation( quality: Option, timeout: u64, write_file: F, -) -> Result, MagickError> +) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -60,12 +60,12 @@ where let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; - let reader = Process::run("magick", &args, &envs, timeout)?.read(); + let reader = Process::run("magick", &args, &envs, timeout)? + .read() + .add_extras(input_file) + .add_extras(temporary_path); - let clean_reader = input_file.reader(reader); - let clean_reader = temporary_path.reader(clean_reader); - - Ok(Box::pin(clean_reader)) + Ok(reader) } pub(super) async fn thumbnail( @@ -76,7 +76,7 @@ pub(super) async fn thumbnail( format: ProcessableFormat, quality: Option, timeout: u64, -) -> Result, MagickError> { +) -> Result { let stream = store .to_stream(identifier, None, None) .await diff --git a/src/ingest.rs b/src/ingest.rs index 670447e..c67c4ed 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -68,10 +68,10 @@ where }; tracing::trace!("Validating bytes"); - let (input_type, validated_reader) = + let (input_type, process_read) = crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?; - let processed_reader = if let Some(operations) = media.preprocess_steps() { + let process_read = if let Some(operations) = media.preprocess_steps() { if let Some(format) = input_type.processable_format() { let (_, magick_args) = crate::processor::build_chain(operations, format.file_extension())?; @@ -83,9 +83,9 @@ where } }; - crate::magick::process_image_async_read( + crate::magick::process_image_process_read( tmp_dir, - validated_reader, + process_read, magick_args, format, format, @@ -94,18 +94,23 @@ where ) .await? } else { - validated_reader + process_read } } else { - validated_reader + process_read }; - let hasher_reader = Hasher::new(processed_reader); - let state = hasher_reader.state(); + let (state, identifier) = process_read + .with_stdout(|stdout| async move { + let hasher_reader = Hasher::new(stdout); + let state = hasher_reader.state(); - let identifier = store - .save_async_read(hasher_reader, input_type.media_type()) - .await?; + store + .save_async_read(hasher_reader, input_type.media_type()) + .await + .map(move |identifier| (state, identifier)) + }) + .await??; let bytes_stream = store.to_bytes(&identifier, None, None).await?; let details = diff --git a/src/magick.rs b/src/magick.rs index 3f32d76..8fca68a 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -3,7 +3,7 @@ use std::{ffi::OsStr, sync::Arc}; use crate::{ error_code::ErrorCode, formats::ProcessableFormat, - process::{Process, ProcessError}, + process::{Process, ProcessError, ProcessRead}, read::BoxRead, store::Store, tmp_file::TmpDir, @@ -96,7 +96,7 @@ async fn process_image( quality: Option, timeout: u64, write_file: F, -) -> Result, MagickError> +) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -144,12 +144,12 @@ where let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; - let reader = Process::run("magick", &args, &envs, timeout)?.read(); + let reader = Process::run("magick", &args, &envs, timeout)? + .read() + .add_extras(input_file) + .add_extras(temporary_path); - let clean_reader = input_file.reader(reader); - let clean_reader = temporary_path.reader(clean_reader); - - Ok(Box::pin(clean_reader)) + Ok(reader) } #[allow(clippy::too_many_arguments)] @@ -162,7 +162,7 @@ pub(crate) async fn process_image_store_read( format: ProcessableFormat, quality: Option, timeout: u64, -) -> Result, MagickError> { +) -> Result { let stream = store .to_stream(identifier, None, None) .await @@ -194,7 +194,7 @@ pub(crate) async fn process_image_async_read( format: ProcessableFormat, quality: Option, timeout: u64, -) -> Result, MagickError> { +) -> Result { process_image( tmp_dir, args, @@ -212,3 +212,35 @@ pub(crate) async fn process_image_async_read( ) .await } + +pub(crate) async fn process_image_process_read( + tmp_dir: &TmpDir, + process_read: ProcessRead, + args: Vec, + input_format: ProcessableFormat, + format: ProcessableFormat, + quality: Option, + timeout: u64, +) -> Result { + process_image( + tmp_dir, + args, + input_format, + format, + quality, + timeout, + |mut tmp_file| async move { + process_read + .with_stdout(|stdout| async { + tmp_file + .write_from_async_read(stdout) + .await + .map_err(MagickError::Write) + }) + .await??; + + Ok(tmp_file) + }, + ) + .await +} diff --git a/src/process.rs b/src/process.rs index 3f2f6d0..f596023 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,5 +1,6 @@ use actix_web::web::Bytes; use std::{ + any::Any, ffi::OsStr, future::Future, pin::Pin, @@ -12,15 +13,17 @@ use std::{ time::{Duration, Instant}, }; use tokio::{ - io::{AsyncRead, AsyncWriteExt, ReadBuf}, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf}, process::{Child, ChildStdin, ChildStdout, Command}, }; -use tracing::Span; +use tracing::{Instrument, Span}; use uuid::Uuid; use crate::{ + error::Error, error_code::ErrorCode, future::{LocalBoxFuture, WithTimeout}, + read::BoxRead, }; struct MetricsGuard { @@ -75,24 +78,12 @@ impl std::fmt::Debug for Process { } } -struct ProcessReadState { - flags: AtomicU8, - parent: Mutex>, -} - -struct ProcessReadWaker { - state: Arc, - flag: u8, -} - pub(crate) struct ProcessRead { - stdout: ChildStdout, - handle: LocalBoxFuture<'static, std::io::Result<()>>, - closed: bool, - state: Arc, - span: Option, + reader: BoxRead<'static>, + handle: LocalBoxFuture<'static, Result<(), ProcessError>>, command: Arc, id: Uuid, + extras: Box, } #[derive(Debug, thiserror::Error)] @@ -112,6 +103,9 @@ pub(crate) enum ProcessError { #[error("{0} Failed with {1}")] Status(Arc, ExitStatus), + #[error("Failed to read stdout for {0}")] + Read(Arc, #[source] std::io::Error), + #[error("Unknown process error")] Other(#[source] std::io::Error), } @@ -121,11 +115,15 @@ impl ProcessError { match self { Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, - Self::LimitReached | Self::Other(_) => ErrorCode::COMMAND_ERROR, + Self::LimitReached | Self::Read(_, _) | Self::Other(_) => ErrorCode::COMMAND_ERROR, Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, Self::Status(_, _) => ErrorCode::COMMAND_FAILURE, } } + + pub(crate) fn is_client_error(&self) -> bool { + matches!(self, Self::Timeout(_)) + } } impl Process { @@ -240,6 +238,7 @@ impl Process { let stdin = child.stdin.take().expect("stdin exists"); let stdout = child.stdout.take().expect("stdout exists"); + let command2 = command.clone(); let handle = Box::pin(async move { let child_fut = async { (f)(stdin).await?; @@ -252,184 +251,108 @@ impl Process { guard.disarm(); return Ok(()); } - Ok(Ok(status)) => { - std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) - } - Ok(Err(e)) => e, - Err(_) => std::io::ErrorKind::TimedOut.into(), + Ok(Ok(status)) => ProcessError::Status(command2, status), + Ok(Err(e)) => ProcessError::Other(e), + Err(_) => ProcessError::Timeout(command2), }; - child.kill().await?; + child.kill().await.map_err(ProcessError::Other)?; Err(error) }); ProcessRead { - stdout, + reader: Box::pin(stdout), handle, - closed: false, - state: ProcessReadState::new_woken(), - span: None, command, id, + extras: Box::new(()), } } } -impl ProcessReadState { - fn new_woken() -> Arc { - Arc::new(Self { - flags: AtomicU8::new(0xff), - parent: Mutex::new(None), - }) - } - - fn clone_parent(&self) -> Option { - let guard = self.parent.lock().unwrap(); - guard.as_ref().cloned() - } - - fn into_parts(self) -> (AtomicU8, Option) { - let ProcessReadState { flags, parent } = self; - - let parent = parent.lock().unwrap().take(); - - (flags, parent) - } -} - impl ProcessRead { - fn get_waker(&self, flag: u8) -> Option { - let mask = 0xff ^ flag; - let previous = self.state.flags.fetch_and(mask, Ordering::AcqRel); - let active = previous & flag; - - if active == flag { - Some( - Arc::new(ProcessReadWaker { - state: self.state.clone(), - flag, - }) - .into(), - ) - } else { - None + pub(crate) fn new(reader: BoxRead<'static>, command: Arc, id: Uuid) -> Self { + Self { + reader, + handle: Box::pin(async { Ok(()) }), + command, + id, + extras: Box::new(()), } } - fn set_parent_waker(&self, parent: &Waker) -> bool { - let mut guard = self.state.parent.lock().unwrap(); - if let Some(waker) = guard.as_mut() { - if !waker.will_wake(parent) { - *waker = parent.clone(); - true - } else { - false - } - } else { - *guard = Some(parent.clone()); - true - } + pub(crate) async fn to_vec(self) -> Result, ProcessError> { + let cmd = self.command.clone(); + + self.with_stdout(move |mut stdout| async move { + let mut vec = Vec::new(); + + stdout + .read_to_end(&mut vec) + .await + .map_err(|e| ProcessError::Read(cmd, e)) + .map(move |_| vec) + }) + .await? } - fn mark_all_woken(&self) { - self.state.flags.store(0xff, Ordering::Release); - } -} + pub(crate) async fn to_string(self) -> Result { + let cmd = self.command.clone(); -const HANDLE_WAKER: u8 = 0b_0100; + self.with_stdout(move |mut stdout| async move { + let mut s = String::new(); -impl AsyncRead for ProcessRead { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let command = self.command.clone(); - let id = self.id; - let span = self - .span - .get_or_insert_with(|| tracing::info_span!("process task", %command, %id)) - .clone(); - let guard = span.enter(); - - let value = loop { - // always poll for bytes when poll_read is called - let before_size = buf.filled().len(); - - if let Poll::Ready(res) = Pin::new(&mut self.stdout).poll_read(cx, buf) { - if let Err(e) = res { - self.closed = true; - - break Poll::Ready(Err(e)); - } else if buf.filled().len() == before_size { - self.closed = true; - - break Poll::Ready(Ok(())); - } else { - break Poll::Ready(Ok(())); - } - } else if self.closed { - // Stop if we're closed - break Poll::Ready(Ok(())); - } else if let Some(waker) = self.get_waker(HANDLE_WAKER) { - // only poll handle if we've been explicitly woken - let mut handle_cx = Context::from_waker(&waker); - - if let Poll::Ready(res) = Pin::new(&mut self.handle).poll(&mut handle_cx) { - self.closed = true; - - if let Err(e) = res { - break Poll::Ready(Err(e)); - } - } - } else if self.set_parent_waker(cx.waker()) { - // if we updated the stored waker, mark all as woken an try polling again - // This doesn't actually "wake" the waker, it just allows the handle to be polled - // again next iteration - self.mark_all_woken(); - } else { - // if the waker hasn't changed and nothing polled ready, return pending - break Poll::Pending; - } - }; - - drop(guard); - - value - } -} - -impl Wake for ProcessReadWaker { - fn wake(self: Arc) { - match Arc::try_unwrap(self) { - Ok(ProcessReadWaker { state, flag }) => match Arc::try_unwrap(state) { - Ok(state) => { - let (flags, parent) = state.into_parts(); - - flags.fetch_and(flag, Ordering::AcqRel); - - if let Some(parent) = parent { - parent.wake(); - } - } - Err(state) => { - state.flags.fetch_or(flag, Ordering::AcqRel); - - if let Some(waker) = state.clone_parent() { - waker.wake(); - } - } - }, - Err(this) => this.wake_by_ref(), - } + stdout + .read_to_string(&mut s) + .await + .map_err(|e| ProcessError::Read(cmd, e)) + .map(move |_| s) + }) + .await? } - fn wake_by_ref(self: &Arc) { - self.state.flags.fetch_or(self.flag, Ordering::AcqRel); + pub(crate) async fn with_stdout( + self, + f: impl FnOnce(BoxRead<'static>) -> Fut, + ) -> Result + where + Fut: Future, + { + let Self { + reader, + handle, + command, + id, + extras, + } = self; - if let Some(parent) = self.state.clone_parent() { - parent.wake(); + let (out, res) = tokio::join!( + (f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)), + handle.instrument(tracing::info_span!("cmd-handle", %command, %id)) + ); + res?; + + drop(extras); + + Ok(out) + } + + pub(crate) fn add_extras(self, more_extras: Extras) -> ProcessRead { + let Self { + reader, + handle, + command, + id, + extras, + } = self; + + Self { + reader, + handle, + command, + id, + extras: Box::new((extras, more_extras)), } } } diff --git a/src/validate.rs b/src/validate.rs index a53ccba..2ff57ca 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -10,6 +10,7 @@ use crate::{ AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, InternalFormat, Validations, }, + process::ProcessRead, read::BoxRead, tmp_file::TmpDir, }; @@ -61,7 +62,7 @@ pub(crate) async fn validate_bytes( bytes: Bytes, validations: Validations<'_>, timeout: u64, -) -> Result<(InternalFormat, BoxRead<'static>), Error> { +) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); } @@ -75,7 +76,7 @@ pub(crate) async fn validate_bytes( match &input { InputFile::Image(input) => { - let (format, read) = process_image( + let (format, process_read) = process_image( tmp_dir, bytes, *input, @@ -86,10 +87,10 @@ pub(crate) async fn validate_bytes( ) .await?; - Ok((format, read)) + Ok((format, process_read)) } InputFile::Animation(input) => { - let (format, read) = process_animation( + let (format, process_read) = process_animation( tmp_dir, bytes, *input, @@ -101,10 +102,10 @@ pub(crate) async fn validate_bytes( ) .await?; - Ok((format, read)) + Ok((format, process_read)) } InputFile::Video(input) => { - let (format, read) = process_video( + let (format, process_read) = process_video( tmp_dir, bytes, *input, @@ -116,7 +117,7 @@ pub(crate) async fn validate_bytes( ) .await?; - Ok((format, read)) + Ok((format, process_read)) } } } @@ -130,7 +131,7 @@ async fn process_image( height: u16, validations: &crate::config::Image, timeout: u64, -) -> Result<(InternalFormat, BoxRead<'static>), Error> { +) -> Result<(InternalFormat, ProcessRead), Error> { if width > validations.max_width { return Err(ValidationError::Width.into()); } @@ -149,7 +150,7 @@ async fn process_image( needs_transcode, } = input.build_output(validations.format); - let read = if needs_transcode { + let process_read = if needs_transcode { let quality = validations.quality_for(format); magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await? @@ -157,7 +158,7 @@ async fn process_image( exiftool::clear_metadata_bytes_read(bytes, timeout)? }; - Ok((InternalFormat::Image(format), read)) + Ok((InternalFormat::Image(format), process_read)) } fn validate_animation( @@ -197,7 +198,7 @@ async fn process_animation( frames: u32, validations: &crate::config::Animation, timeout: u64, -) -> Result<(InternalFormat, BoxRead<'static>), Error> { +) -> Result<(InternalFormat, ProcessRead), Error> { validate_animation(bytes.len(), width, height, frames, validations)?; let AnimationOutput { @@ -205,7 +206,7 @@ async fn process_animation( needs_transcode, } = input.build_output(validations.format); - let read = if needs_transcode { + let process_read = if needs_transcode { let quality = validations.quality_for(format); magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await? @@ -213,7 +214,7 @@ async fn process_animation( exiftool::clear_metadata_bytes_read(bytes, timeout)? }; - Ok((InternalFormat::Animation(format), read)) + Ok((InternalFormat::Animation(format), process_read)) } fn validate_video( @@ -256,7 +257,7 @@ async fn process_video( frames: u32, validations: &crate::config::Video, timeout: u64, -) -> Result<(InternalFormat, BoxRead<'static>), Error> { +) -> Result<(InternalFormat, ProcessRead), Error> { validate_video(bytes.len(), width, height, frames, validations)?; let output = input.build_output( @@ -267,7 +268,10 @@ async fn process_video( let crf = validations.crf_for(width, height); - let read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?; + let process_read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?; - Ok((InternalFormat::Video(output.format.internal_format()), read)) + Ok(( + InternalFormat::Video(output.format.internal_format()), + process_read, + )) } diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index 7d7dba5..7c468cf 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,13 +1,15 @@ use actix_web::web::Bytes; -use crate::{exiftool::ExifError, process::Process, read::BoxRead}; +use crate::{ + exiftool::ExifError, + process::{Process, ProcessRead}, + read::BoxRead, +}; #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn clear_metadata_bytes_read( input: Bytes, timeout: u64, -) -> Result, ExifError> { - let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; - - Ok(Box::pin(process.bytes_read(input))) +) -> Result { + Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input)) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 2034e8a..ed0ac47 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -1,11 +1,12 @@ -use std::ffi::OsStr; +use std::{ffi::OsStr, sync::Arc}; use actix_web::web::Bytes; +use uuid::Uuid; use crate::{ ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, - process::Process, + process::{Process, ProcessRead}, read::BoxRead, tmp_file::TmpDir, }; @@ -17,7 +18,7 @@ pub(super) async fn transcode_bytes( crf: u8, timeout: u64, bytes: Bytes, -) -> Result, FfMpegError> { +) -> Result { let input_file = tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await @@ -52,9 +53,15 @@ pub(super) async fn transcode_bytes( .await .map_err(FfMpegError::ReadFile)?; let reader = tokio_util::io::StreamReader::new(stream); - let clean_reader = output_file.reader(reader); - Ok(Box::pin(clean_reader)) + let process_read = ProcessRead::new( + Box::pin(reader), + Arc::from(String::from("ffmpeg")), + Uuid::now_v7(), + ) + .add_extras(output_file); + + Ok(process_read) } async fn transcode_files( diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 249fc6d..c6201fb 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -5,9 +5,9 @@ use actix_web::web::Bytes; use crate::{ formats::{AnimationFormat, ImageFormat}, magick::{MagickError, MAGICK_TEMPORARY_PATH}, - process::Process, + process::{Process, ProcessRead}, read::BoxRead, - tmp_file::TmpDir, + tmp_file::{TmpDir, TmpFile, TmpFolder}, }; pub(super) async fn convert_image( @@ -17,7 +17,7 @@ pub(super) async fn convert_image( quality: Option, timeout: u64, bytes: Bytes, -) -> Result, MagickError> { +) -> Result { convert( tmp_dir, input.magick_format(), @@ -37,7 +37,7 @@ pub(super) async fn convert_animation( quality: Option, timeout: u64, bytes: Bytes, -) -> Result, MagickError> { +) -> Result { convert( tmp_dir, input.magick_format(), @@ -58,7 +58,7 @@ async fn convert( quality: Option, timeout: u64, bytes: Bytes, -) -> Result, MagickError> { +) -> Result { let temporary_path = tmp_dir .tmp_folder() .await @@ -101,8 +101,7 @@ async fn convert( let reader = Process::run("magick", &args, &envs, timeout)?.read(); - let clean_reader = input_file.reader(reader); - let clean_reader = temporary_path.reader(clean_reader); + let clean_reader = reader.add_extras(input_file).add_extras(temporary_path); - Ok(Box::pin(clean_reader)) + Ok(clean_reader) }