diff --git a/src/blurhash.rs b/src/blurhash.rs index 897e04f..3c22cf9 100644 --- a/src/blurhash.rs +++ b/src/blurhash.rs @@ -1,15 +1,13 @@ use std::ffi::{OsStr, OsString}; -use futures_core::Stream; use tokio::io::AsyncReadExt; -use tokio_util::bytes::Bytes; use crate::{ details::Details, error::{Error, UploadError}, formats::ProcessableFormat, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, - process::{Process, ProcessRead}, + process::Process, repo::Alias, state::State, store::Store, @@ -43,47 +41,44 @@ where let stream = state.store.to_stream(&identifier, None, None).await?; - let process = read_rgba( + let blurhash = read_rgba_command( state, input_details .internal_format() .processable_format() .expect("not a video"), - stream, ) - .await?; + .await? + .drive_with_stream(stream) + .with_stdout(|mut stdout| async move { + let mut encoder = blurhash_update::Encoder::auto(blurhash_update::ImageBounds { + width: input_details.width() as _, + height: input_details.height() as _, + }); - let blurhash = process - .with_stdout(|mut stdout| async move { - let mut encoder = blurhash_update::Encoder::auto(blurhash_update::ImageBounds { - width: input_details.width() as _, - height: input_details.height() as _, - }); + let mut buf = [0u8; 1024 * 8]; - let mut buf = [0u8; 1024 * 8]; + loop { + let n = stdout.read(&mut buf).await?; - loop { - let n = stdout.read(&mut buf).await?; - - if n == 0 { - break; - } - - encoder.update(&buf[..n]); + if n == 0 { + break; } - Ok(encoder.finalize()) as std::io::Result - }) - .await??; + encoder.update(&buf[..n]); + } + + Ok(encoder.finalize()) as std::io::Result + }) + .await??; Ok(blurhash) } -async fn read_rgba( +async fn read_rgba_command( state: &State, input_format: ProcessableFormat, - stream: impl Stream> + 'static, -) -> Result { +) -> Result { let temporary_path = state .tmp_dir .tmp_folder() @@ -104,7 +99,6 @@ async fn read_rgba( ]; let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? - .stream_read(stream) .add_extras(temporary_path); Ok(process) diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 69b237d..de0b7e3 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -41,7 +41,7 @@ pub(super) async fn check_reorient( #[tracing::instrument(level = "trace", skip_all)] async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_stream_read(input) + .drive_with_async_read(input.into_reader()) .into_string() .await?; diff --git a/src/process.rs b/src/process.rs index 4d04ba5..66a6a1d 100644 --- a/src/process.rs +++ b/src/process.rs @@ -9,7 +9,7 @@ use std::{ use futures_core::Stream; use streem::IntoStreamer; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; use tokio_util::{bytes::Bytes, io::ReaderStream}; @@ -67,6 +67,7 @@ pub(crate) struct Process { child: Child, guard: MetricsGuard, timeout: Duration, + extras: Box, id: Uuid, } @@ -204,11 +205,19 @@ impl Process { command, guard, timeout: Duration::from_secs(timeout), + extras: Box::new(()), id: Uuid::now_v7(), }) }) } + pub(crate) fn add_extras(self, extra: impl Extras + 'static) -> Self { + Self { + extras: Box::new((self.extras, extra)), + ..self + } + } + #[tracing::instrument(skip(self), fields(command = %self.command, id = %self.id))] pub(crate) async fn wait(self) -> Result<(), ProcessError> { let Process { @@ -216,11 +225,17 @@ impl Process { mut child, guard, timeout, + mut extras, id: _, } = self; let res = child.wait().with_timeout(timeout).await; + extras + .consume() + .await + .map_err(|e| ProcessError::Cleanup(command.clone(), e))?; + match res { Ok(Ok(status)) if status.success() => { guard.disarm(); @@ -236,10 +251,12 @@ impl Process { } } - pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead { - self.spawn_fn(move |mut stdin| { + pub(crate) fn drive_with_async_read(self, input: impl AsyncRead + 'static) -> ProcessRead { + self.drive(move |mut stdin| { async move { - match tokio::io::copy(&mut input.into_reader(), &mut stdin).await { + let mut input = std::pin::pin!(input); + + match tokio::io::copy(&mut input, &mut stdin).await { Ok(_) => Ok(()), // BrokenPipe means we finished reading from Stdout, so we don't need to write // to stdin. We'll still error out if the command failed so treat this as a @@ -251,11 +268,11 @@ impl Process { }) } - pub(crate) fn stream_read(self, input: S) -> ProcessRead + pub(crate) fn drive_with_stream(self, input: S) -> ProcessRead where S: Stream> + 'static, { - self.spawn_fn(move |mut stdin| async move { + self.drive(move |mut stdin| async move { let stream = std::pin::pin!(input); let mut stream = stream.into_streamer(); @@ -272,13 +289,13 @@ impl Process { } pub(crate) fn read(self) -> ProcessRead { - self.spawn_fn(|_| async { Ok(()) }) + self.drive(|_| async { Ok(()) }) } #[allow(unknown_lints)] #[allow(clippy::let_with_type_underscore)] #[tracing::instrument(level = "trace", skip_all)] - fn spawn_fn(self, f: F) -> ProcessRead + fn drive(self, f: F) -> ProcessRead where F: FnOnce(ChildStdin) -> Fut + 'static, Fut: Future>, @@ -288,6 +305,7 @@ impl Process { mut child, guard, timeout, + extras, id, } = self; @@ -324,7 +342,7 @@ impl Process { handle, command, id, - extras: Box::new(()), + extras, } } } diff --git a/src/validate.rs b/src/validate.rs index 4358491..ef28c10 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -11,11 +11,10 @@ use crate::{ AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, InternalFormat, }, - process::ProcessRead, + process::{Process, ProcessRead}, state::State, }; - #[derive(Debug, thiserror::Error)] pub(crate) enum ValidationError { #[error("Too wide")] @@ -74,15 +73,23 @@ pub(crate) async fn validate_bytes_stream( match &input { InputFile::Image(input) => { - let (format, process_read) = process_image(state, bytes, *input, width, height).await?; + let (format, process) = + process_image_command(state, *input, bytes.len(), width, height).await?; - Ok((format, process_read)) + Ok((format, process.drive_with_async_read(bytes.into_reader()))) } InputFile::Animation(input) => { - let (format, process_read) = - process_animation(state, bytes, *input, width, height, frames.unwrap_or(1)).await?; + let (format, process) = process_animation_command( + state, + *input, + bytes.len(), + width, + height, + frames.unwrap_or(1), + ) + .await?; - Ok((format, process_read)) + Ok((format, process.drive_with_async_read(bytes.into_reader()))) } InputFile::Video(input) => { let (format, process_read) = @@ -93,14 +100,14 @@ pub(crate) async fn validate_bytes_stream( } } -#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] -async fn process_image( +#[tracing::instrument(skip(state))] +async fn process_image_command( state: &State, - bytes: BytesStream, input: ImageInput, + length: usize, width: u16, height: u16, -) -> Result<(InternalFormat, ProcessRead), Error> { +) -> Result<(InternalFormat, Process), Error> { let validations = &state.config.media.image; if width > validations.max_width { @@ -112,7 +119,7 @@ async fn process_image( if u32::from(width) * u32::from(height) > validations.max_area { return Err(ValidationError::Area.into()); } - if bytes.len() > validations.max_file_size * MEGABYTES { + if length > validations.max_file_size * MEGABYTES { return Err(ValidationError::Filesize.into()); } @@ -121,15 +128,15 @@ async fn process_image( needs_transcode, } = input.build_output(validations.format); - let process_read = if needs_transcode { + let process = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_image(state, input.format, format, quality, bytes).await? + magick::convert_image_command(state, input.format, format, quality).await? } else { - exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)? + exiftool::clear_metadata_command(state.config.media.process_timeout)? }; - Ok((InternalFormat::Image(format), process_read)) + Ok((InternalFormat::Image(format), process)) } fn validate_animation( @@ -158,33 +165,33 @@ fn validate_animation( Ok(()) } -#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] -async fn process_animation( +#[tracing::instrument(skip(state))] +async fn process_animation_command( state: &State, - bytes: BytesStream, input: AnimationFormat, + length: usize, width: u16, height: u16, frames: u32, -) -> Result<(InternalFormat, ProcessRead), Error> { +) -> Result<(InternalFormat, Process), Error> { let validations = &state.config.media.animation; - validate_animation(bytes.len(), width, height, frames, validations)?; + validate_animation(length, width, height, frames, validations)?; let AnimationOutput { format, needs_transcode, } = input.build_output(validations.format); - let process_read = if needs_transcode { + let process = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_animation(state, input, format, quality, bytes).await? + magick::convert_animation_command(state, input, format, quality).await? } else { - exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)? + exiftool::clear_metadata_command(state.config.media.process_timeout)? }; - Ok((InternalFormat::Animation(format), process_read)) + Ok((InternalFormat::Animation(format), process)) } fn validate_video( diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index 2c6ba4a..b9e7199 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,16 +1,11 @@ -use crate::{ - bytes_stream::BytesStream, - exiftool::ExifError, - process::{Process, ProcessRead}, -}; +use crate::{exiftool::ExifError, process::Process}; #[tracing::instrument(level = "trace", skip_all)] -pub(super) fn clear_metadata_bytes_read( - input: BytesStream, - timeout: u64, -) -> Result { - Ok( - Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)? - .bytes_stream_read(input), - ) +pub(super) fn clear_metadata_command(timeout: u64) -> Result { + Ok(Process::run( + "exiftool", + &["-all=", "-", "-out", "-"], + &[], + timeout, + )?) } diff --git a/src/validate/magick.rs b/src/validate/magick.rs index d6c1ab1..3e3b8e6 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -1,82 +1,60 @@ use std::ffi::OsStr; - - use crate::{ - bytes_stream::BytesStream, formats::{AnimationFormat, ImageFormat}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, - process::{Process, ProcessRead}, + process::Process, state::State, }; -pub(super) async fn convert_image( +pub(super) async fn convert_image_command( state: &State, input: ImageFormat, output: ImageFormat, quality: Option, - bytes: BytesStream, -) -> Result { +) -> Result { convert( state, input.magick_format(), output.magick_format(), false, quality, - bytes, ) .await } -pub(super) async fn convert_animation( +pub(super) async fn convert_animation_command( state: &State, input: AnimationFormat, output: AnimationFormat, quality: Option, - bytes: BytesStream, -) -> Result { +) -> Result { convert( state, input.magick_format(), output.magick_format(), true, quality, - bytes, ) .await } async fn convert( state: &State, - input: &'static str, - output: &'static str, + input_format: &'static str, + output_format: &'static str, coalesce: bool, quality: Option, - bytes: BytesStream, -) -> Result { +) -> Result { let temporary_path = state .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = state.tmp_dir.tmp_file(None); + let input_arg = format!("{input_format}:-"); - crate::store::file_store::safe_create_parent(&input_file) - .await - .map_err(MagickError::CreateDir)?; - - let mut tmp_one = crate::file::File::create(&input_file) - .await - .map_err(MagickError::CreateFile)?; - tmp_one - .write_from_stream(bytes.into_io_stream()) - .await - .map_err(MagickError::Write)?; - tmp_one.close().await.map_err(MagickError::CloseFile)?; - - let input_arg = [input.as_ref(), input_file.as_os_str()].join(":".as_ref()); - let output_arg = format!("{output}:-"); + let output_arg = format!("{output_format}:-"); let quality = quality.map(|q| q.to_string()); let mut args: Vec<&OsStr> = vec!["convert".as_ref()]; @@ -85,7 +63,11 @@ async fn convert( args.push("-coalesce".as_ref()); } - args.extend(["-strip".as_ref(), "-auto-orient".as_ref(), &input_arg] as [&OsStr; 3]); + args.extend([ + "-strip".as_ref(), + "-auto-orient".as_ref(), + input_arg.as_ref(), + ] as [&OsStr; 3]); if let Some(quality) = &quality { args.extend(["-quality".as_ref(), quality.as_ref()] as [&OsStr; 2]); @@ -98,9 +80,8 @@ async fn convert( (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?.read(); + let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? + .add_extras(temporary_path); - let clean_reader = reader.add_extras(input_file).add_extras(temporary_path); - - Ok(clean_reader) + Ok(process) }