Rework ProcessRead to require a closure for using the internal AsyncRead

This commit is contained in:
asonix 2023-12-22 20:52:58 -06:00
parent fb909210ed
commit 5624671cbf
15 changed files with 265 additions and 299 deletions

View File

@ -42,14 +42,10 @@ pub(super) async fn check_reorient(
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> { async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?; let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
let mut reader = process.bytes_read(input); .bytes_read(input)
.to_string()
let mut buf = String::new(); .await?;
reader
.read_to_string(&mut buf)
.await
.map_err(ExifError::Read)?;
Ok(!buf.is_empty()) Ok(!buf.is_empty())
} }

View File

@ -212,7 +212,7 @@ where
let tmp_one = (f)(tmp_one).await?; let tmp_one = (f)(tmp_one).await?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let process = Process::run( let output = Process::run(
"ffprobe", "ffprobe",
&[ &[
"-v".as_ref(), "-v".as_ref(),
@ -228,14 +228,10 @@ where
], ],
&[], &[],
timeout, timeout,
)?; )?
let mut output = Vec::new();
process
.read() .read()
.read_to_end(&mut output) .to_vec()
.await .await?;
.map_err(FfMpegError::Read)?;
drop(input_file); drop(input_file);
@ -262,7 +258,7 @@ where
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegError> { async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegError> {
let process = Process::run( let output = Process::run(
"ffprobe", "ffprobe",
&[ &[
"-v", "-v",
@ -276,14 +272,10 @@ async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegErro
], ],
&[], &[],
timeout, timeout,
)?; )?
let mut output = Vec::new();
process
.read() .read()
.read_to_end(&mut output) .to_vec()
.await .await?;
.map_err(FfMpegError::Read)?;
let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?;

View File

@ -118,7 +118,7 @@ where
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
let process = Process::run( let output = Process::run(
"magick", "magick",
&[ &[
"convert".as_ref(), "convert".as_ref(),
@ -128,14 +128,10 @@ where
], ],
&envs, &envs,
timeout, timeout,
)?; )?
let mut output = String::new();
process
.read() .read()
.read_to_string(&mut output) .to_string()
.await .await?;
.map_err(MagickError::Read)?;
drop(input_file); drop(input_file);
drop(temporary_path); drop(temporary_path);
@ -185,7 +181,7 @@ where
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
let process = Process::run( let output = Process::run(
"magick", "magick",
&[ &[
"convert".as_ref(), "convert".as_ref(),
@ -195,16 +191,13 @@ where
], ],
&envs, &envs,
timeout, timeout,
)?; )?
let mut output = Vec::new();
process
.read() .read()
.read_to_end(&mut output) .to_vec()
.await .await?;
.map_err(MagickError::Read)?;
drop(input_file); drop(input_file);
drop(temporary_path);
if output.is_empty() { if output.is_empty() {
return Err(MagickError::Empty); return Err(MagickError::Empty);

View File

@ -96,6 +96,9 @@ pub(crate) enum UploadError {
#[error("Error in exiftool")] #[error("Error in exiftool")]
Exiftool(#[from] crate::exiftool::ExifError), Exiftool(#[from] crate::exiftool::ExifError),
#[error("Error in process")]
Process(#[from] crate::process::ProcessError),
#[error("Error building reqwest client")] #[error("Error building reqwest client")]
BuildClient(#[source] reqwest::Error), BuildClient(#[source] reqwest::Error),
@ -172,6 +175,7 @@ impl UploadError {
Self::Ffmpeg(e) => e.error_code(), Self::Ffmpeg(e) => e.error_code(),
Self::Magick(e) => e.error_code(), Self::Magick(e) => e.error_code(),
Self::Exiftool(e) => e.error_code(), Self::Exiftool(e) => e.error_code(),
Self::Process(e) => e.error_code(),
Self::BuildClient(_) | Self::RequestMiddleware(_) | Self::Request(_) => { Self::BuildClient(_) | Self::RequestMiddleware(_) | Self::Request(_) => {
ErrorCode::HTTP_CLIENT_ERROR ErrorCode::HTTP_CLIENT_ERROR
} }
@ -246,6 +250,7 @@ impl ResponseError for Error {
Some(UploadError::Magick(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Magick(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::Exiftool(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Exiftool(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::Process(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND, Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND,
Some(UploadError::Ffmpeg(e)) if e.is_not_found() => StatusCode::NOT_FOUND, Some(UploadError::Ffmpeg(e)) if e.is_not_found() => StatusCode::NOT_FOUND,
Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN, Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN,

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
error_code::ErrorCode, error_code::ErrorCode,
process::{Process, ProcessError}, process::{Process, ProcessError, ProcessRead},
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::{AsyncRead, AsyncReadExt};
@ -45,14 +45,10 @@ impl ExifError {
#[tracing::instrument(level = "trace", skip(input))] #[tracing::instrument(level = "trace", skip(input))]
pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> { pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?; let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
let mut reader = process.bytes_read(input); .bytes_read(input)
.to_string()
let mut buf = String::new(); .await?;
reader
.read_to_string(&mut buf)
.await
.map_err(ExifError::Read)?;
Ok(!buf.is_empty()) Ok(!buf.is_empty())
} }
@ -61,7 +57,7 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool
pub(crate) fn clear_metadata_bytes_read( pub(crate) fn clear_metadata_bytes_read(
timeout: u64, timeout: u64,
input: Bytes, input: Bytes,
) -> Result<impl AsyncRead + Unpin, ExifError> { ) -> Result<ProcessRead, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?;
Ok(process.bytes_read(input)) Ok(process.bytes_read(input))

View File

@ -135,7 +135,7 @@ async fn process<S: Store + 'static>(
ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), ProcessableFormat::Animation(format) => config.media.animation.quality_for(format),
}; };
let mut processed_reader = crate::magick::process_image_store_read( let vec = crate::magick::process_image_store_read(
tmp_dir, tmp_dir,
store, store,
&identifier, &identifier,
@ -145,13 +145,11 @@ async fn process<S: Store + 'static>(
quality, quality,
config.media.process_timeout, config.media.process_timeout,
) )
.await?; .await?
.to_vec()
let mut vec = Vec::new();
processed_reader
.read_to_end(&mut vec)
.instrument(tracing::info_span!("Reading processed image to vec")) .instrument(tracing::info_span!("Reading processed image to vec"))
.await?; .await?;
let bytes = Bytes::from(vec); let bytes = Bytes::from(vec);
drop(permit); drop(permit);
@ -254,7 +252,9 @@ where
(reader, thumbnail_format.media_type()) (reader, thumbnail_format.media_type())
}; };
let motion_identifier = store.save_async_read(reader, media_type).await?; let motion_identifier = reader
.with_stdout(|mut stdout| async { store.save_async_read(stdout, media_type).await })
.await??;
repo.relate_motion_identifier(hash, &motion_identifier) repo.relate_motion_identifier(hash, &motion_identifier)
.await?; .await?;

View File

@ -1,8 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid;
use crate::{ use crate::{
ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead, ffmpeg::FfMpegError,
store::Store, tmp_file::TmpDir, formats::InternalVideoFormat,
process::{Process, ProcessRead},
read::BoxRead,
store::Store,
tmp_file::TmpDir,
}; };
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
@ -53,7 +59,7 @@ pub(super) async fn thumbnail<S: Store>(
input_format: InternalVideoFormat, input_format: InternalVideoFormat,
format: ThumbnailFormat, format: ThumbnailFormat,
timeout: u64, timeout: u64,
) -> Result<BoxRead<'static>, FfMpegError> { ) -> Result<ProcessRead, FfMpegError> {
let input_file = tmp_dir.tmp_file(Some(input_format.file_extension())); let input_file = tmp_dir.tmp_file(Some(input_format.file_extension()));
crate::store::file_store::safe_create_parent(&input_file) crate::store::file_store::safe_create_parent(&input_file)
.await .await
@ -108,7 +114,13 @@ pub(super) async fn thumbnail<S: Store>(
.await .await
.map_err(FfMpegError::ReadFile)?; .map_err(FfMpegError::ReadFile)?;
let reader = tokio_util::io::StreamReader::new(stream); let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = output_file.reader(reader);
Ok(Box::pin(clean_reader)) let reader = ProcessRead::new(
Box::pin(reader),
Arc::from(String::from("ffmpeg")),
Uuid::now_v7(),
)
.add_extras(output_file);
Ok(reader)
} }

View File

@ -3,7 +3,7 @@ use std::{ffi::OsStr, sync::Arc};
use crate::{ use crate::{
formats::ProcessableFormat, formats::ProcessableFormat,
magick::{MagickError, MAGICK_TEMPORARY_PATH}, magick::{MagickError, MAGICK_TEMPORARY_PATH},
process::Process, process::{Process, ProcessRead},
read::BoxRead, read::BoxRead,
store::Store, store::Store,
tmp_file::TmpDir, tmp_file::TmpDir,
@ -16,7 +16,7 @@ async fn thumbnail_animation<F, Fut>(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
write_file: F, write_file: F,
) -> Result<BoxRead<'static>, MagickError> ) -> Result<ProcessRead, MagickError>
where where
F: FnOnce(crate::file::File) -> Fut, F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>, Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
@ -60,12 +60,12 @@ where
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
let reader = Process::run("magick", &args, &envs, timeout)?.read(); let reader = Process::run("magick", &args, &envs, timeout)?
.read()
.add_extras(input_file)
.add_extras(temporary_path);
let clean_reader = input_file.reader(reader); Ok(reader)
let clean_reader = temporary_path.reader(clean_reader);
Ok(Box::pin(clean_reader))
} }
pub(super) async fn thumbnail<S: Store + 'static>( pub(super) async fn thumbnail<S: Store + 'static>(
@ -76,7 +76,7 @@ pub(super) async fn thumbnail<S: Store + 'static>(
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
let stream = store let stream = store
.to_stream(identifier, None, None) .to_stream(identifier, None, None)
.await .await

View File

@ -68,10 +68,10 @@ where
}; };
tracing::trace!("Validating bytes"); tracing::trace!("Validating bytes");
let (input_type, validated_reader) = let (input_type, process_read) =
crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?; crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?;
let processed_reader = if let Some(operations) = media.preprocess_steps() { let process_read = if let Some(operations) = media.preprocess_steps() {
if let Some(format) = input_type.processable_format() { if let Some(format) = input_type.processable_format() {
let (_, magick_args) = let (_, magick_args) =
crate::processor::build_chain(operations, format.file_extension())?; crate::processor::build_chain(operations, format.file_extension())?;
@ -83,9 +83,9 @@ where
} }
}; };
crate::magick::process_image_async_read( crate::magick::process_image_process_read(
tmp_dir, tmp_dir,
validated_reader, process_read,
magick_args, magick_args,
format, format,
format, format,
@ -94,18 +94,23 @@ where
) )
.await? .await?
} else { } else {
validated_reader process_read
} }
} else { } else {
validated_reader process_read
}; };
let hasher_reader = Hasher::new(processed_reader); let (state, identifier) = process_read
.with_stdout(|stdout| async move {
let hasher_reader = Hasher::new(stdout);
let state = hasher_reader.state(); let state = hasher_reader.state();
let identifier = store store
.save_async_read(hasher_reader, input_type.media_type()) .save_async_read(hasher_reader, input_type.media_type())
.await?; .await
.map(move |identifier| (state, identifier))
})
.await??;
let bytes_stream = store.to_bytes(&identifier, None, None).await?; let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let details = let details =

View File

@ -3,7 +3,7 @@ use std::{ffi::OsStr, sync::Arc};
use crate::{ use crate::{
error_code::ErrorCode, error_code::ErrorCode,
formats::ProcessableFormat, formats::ProcessableFormat,
process::{Process, ProcessError}, process::{Process, ProcessError, ProcessRead},
read::BoxRead, read::BoxRead,
store::Store, store::Store,
tmp_file::TmpDir, tmp_file::TmpDir,
@ -96,7 +96,7 @@ async fn process_image<F, Fut>(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
write_file: F, write_file: F,
) -> Result<BoxRead<'static>, MagickError> ) -> Result<ProcessRead, MagickError>
where where
F: FnOnce(crate::file::File) -> Fut, F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>, Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
@ -144,12 +144,12 @@ where
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())]; let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
let reader = Process::run("magick", &args, &envs, timeout)?.read(); let reader = Process::run("magick", &args, &envs, timeout)?
.read()
.add_extras(input_file)
.add_extras(temporary_path);
let clean_reader = input_file.reader(reader); Ok(reader)
let clean_reader = temporary_path.reader(clean_reader);
Ok(Box::pin(clean_reader))
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -162,7 +162,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
let stream = store let stream = store
.to_stream(identifier, None, None) .to_stream(identifier, None, None)
.await .await
@ -194,7 +194,7 @@ pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
process_image( process_image(
tmp_dir, tmp_dir,
args, args,
@ -212,3 +212,35 @@ pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
) )
.await .await
} }
pub(crate) async fn process_image_process_read(
tmp_dir: &TmpDir,
process_read: ProcessRead,
args: Vec<String>,
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
) -> Result<ProcessRead, MagickError> {
process_image(
tmp_dir,
args,
input_format,
format,
quality,
timeout,
|mut tmp_file| async move {
process_read
.with_stdout(|stdout| async {
tmp_file
.write_from_async_read(stdout)
.await
.map_err(MagickError::Write)
})
.await??;
Ok(tmp_file)
},
)
.await
}

View File

@ -1,5 +1,6 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use std::{ use std::{
any::Any,
ffi::OsStr, ffi::OsStr,
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -12,15 +13,17 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf},
process::{Child, ChildStdin, ChildStdout, Command}, process::{Child, ChildStdin, ChildStdout, Command},
}; };
use tracing::Span; use tracing::{Instrument, Span};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
error::Error,
error_code::ErrorCode, error_code::ErrorCode,
future::{LocalBoxFuture, WithTimeout}, future::{LocalBoxFuture, WithTimeout},
read::BoxRead,
}; };
struct MetricsGuard { struct MetricsGuard {
@ -75,24 +78,12 @@ impl std::fmt::Debug for Process {
} }
} }
struct ProcessReadState {
flags: AtomicU8,
parent: Mutex<Option<Waker>>,
}
struct ProcessReadWaker {
state: Arc<ProcessReadState>,
flag: u8,
}
pub(crate) struct ProcessRead { pub(crate) struct ProcessRead {
stdout: ChildStdout, reader: BoxRead<'static>,
handle: LocalBoxFuture<'static, std::io::Result<()>>, handle: LocalBoxFuture<'static, Result<(), ProcessError>>,
closed: bool,
state: Arc<ProcessReadState>,
span: Option<Span>,
command: Arc<str>, command: Arc<str>,
id: Uuid, id: Uuid,
extras: Box<dyn Any>,
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -112,6 +103,9 @@ pub(crate) enum ProcessError {
#[error("{0} Failed with {1}")] #[error("{0} Failed with {1}")]
Status(Arc<str>, ExitStatus), Status(Arc<str>, ExitStatus),
#[error("Failed to read stdout for {0}")]
Read(Arc<str>, #[source] std::io::Error),
#[error("Unknown process error")] #[error("Unknown process error")]
Other(#[source] std::io::Error), Other(#[source] std::io::Error),
} }
@ -121,11 +115,15 @@ impl ProcessError {
match self { match self {
Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND,
Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED,
Self::LimitReached | Self::Other(_) => ErrorCode::COMMAND_ERROR, Self::LimitReached | Self::Read(_, _) | Self::Other(_) => ErrorCode::COMMAND_ERROR,
Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT,
Self::Status(_, _) => ErrorCode::COMMAND_FAILURE, Self::Status(_, _) => ErrorCode::COMMAND_FAILURE,
} }
} }
pub(crate) fn is_client_error(&self) -> bool {
matches!(self, Self::Timeout(_))
}
} }
impl Process { impl Process {
@ -240,6 +238,7 @@ impl Process {
let stdin = child.stdin.take().expect("stdin exists"); let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists"); let stdout = child.stdout.take().expect("stdout exists");
let command2 = command.clone();
let handle = Box::pin(async move { let handle = Box::pin(async move {
let child_fut = async { let child_fut = async {
(f)(stdin).await?; (f)(stdin).await?;
@ -252,184 +251,108 @@ impl Process {
guard.disarm(); guard.disarm();
return Ok(()); return Ok(());
} }
Ok(Ok(status)) => { Ok(Ok(status)) => ProcessError::Status(command2, status),
std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) Ok(Err(e)) => ProcessError::Other(e),
} Err(_) => ProcessError::Timeout(command2),
Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(),
}; };
child.kill().await?; child.kill().await.map_err(ProcessError::Other)?;
Err(error) Err(error)
}); });
ProcessRead { ProcessRead {
stdout, reader: Box::pin(stdout),
handle, handle,
closed: false,
state: ProcessReadState::new_woken(),
span: None,
command, command,
id, id,
extras: Box::new(()),
} }
} }
} }
impl ProcessReadState {
fn new_woken() -> Arc<Self> {
Arc::new(Self {
flags: AtomicU8::new(0xff),
parent: Mutex::new(None),
})
}
fn clone_parent(&self) -> Option<Waker> {
let guard = self.parent.lock().unwrap();
guard.as_ref().cloned()
}
fn into_parts(self) -> (AtomicU8, Option<Waker>) {
let ProcessReadState { flags, parent } = self;
let parent = parent.lock().unwrap().take();
(flags, parent)
}
}
impl ProcessRead { impl ProcessRead {
fn get_waker(&self, flag: u8) -> Option<Waker> { pub(crate) fn new(reader: BoxRead<'static>, command: Arc<str>, id: Uuid) -> Self {
let mask = 0xff ^ flag; Self {
let previous = self.state.flags.fetch_and(mask, Ordering::AcqRel); reader,
let active = previous & flag; handle: Box::pin(async { Ok(()) }),
command,
id,
extras: Box::new(()),
}
}
if active == flag { pub(crate) async fn to_vec(self) -> Result<Vec<u8>, ProcessError> {
Some( let cmd = self.command.clone();
Arc::new(ProcessReadWaker {
state: self.state.clone(), self.with_stdout(move |mut stdout| async move {
flag, let mut vec = Vec::new();
stdout
.read_to_end(&mut vec)
.await
.map_err(|e| ProcessError::Read(cmd, e))
.map(move |_| vec)
}) })
.into(), .await?
)
} else {
None
}
} }
fn set_parent_waker(&self, parent: &Waker) -> bool { pub(crate) async fn to_string(self) -> Result<String, ProcessError> {
let mut guard = self.state.parent.lock().unwrap(); let cmd = self.command.clone();
if let Some(waker) = guard.as_mut() {
if !waker.will_wake(parent) { self.with_stdout(move |mut stdout| async move {
*waker = parent.clone(); let mut s = String::new();
true
} else { stdout
false .read_to_string(&mut s)
} .await
} else { .map_err(|e| ProcessError::Read(cmd, e))
*guard = Some(parent.clone()); .map(move |_| s)
true })
} .await?
} }
fn mark_all_woken(&self) { pub(crate) async fn with_stdout<Fut>(
self.state.flags.store(0xff, Ordering::Release); self,
} f: impl FnOnce(BoxRead<'static>) -> Fut,
) -> Result<Fut::Output, ProcessError>
where
Fut: Future,
{
let Self {
reader,
handle,
command,
id,
extras,
} = self;
let (out, res) = tokio::join!(
(f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)),
handle.instrument(tracing::info_span!("cmd-handle", %command, %id))
);
res?;
drop(extras);
Ok(out)
} }
const HANDLE_WAKER: u8 = 0b_0100; pub(crate) fn add_extras<Extras: 'static>(self, more_extras: Extras) -> ProcessRead {
let Self {
reader,
handle,
command,
id,
extras,
} = self;
impl AsyncRead for ProcessRead { Self {
fn poll_read( reader,
mut self: Pin<&mut Self>, handle,
cx: &mut Context<'_>, command,
buf: &mut ReadBuf<'_>, id,
) -> Poll<std::io::Result<()>> { extras: Box::new((extras, more_extras)),
let command = self.command.clone();
let id = self.id;
let span = self
.span
.get_or_insert_with(|| tracing::info_span!("process task", %command, %id))
.clone();
let guard = span.enter();
let value = loop {
// always poll for bytes when poll_read is called
let before_size = buf.filled().len();
if let Poll::Ready(res) = Pin::new(&mut self.stdout).poll_read(cx, buf) {
if let Err(e) = res {
self.closed = true;
break Poll::Ready(Err(e));
} else if buf.filled().len() == before_size {
self.closed = true;
break Poll::Ready(Ok(()));
} else {
break Poll::Ready(Ok(()));
}
} else if self.closed {
// Stop if we're closed
break Poll::Ready(Ok(()));
} else if let Some(waker) = self.get_waker(HANDLE_WAKER) {
// only poll handle if we've been explicitly woken
let mut handle_cx = Context::from_waker(&waker);
if let Poll::Ready(res) = Pin::new(&mut self.handle).poll(&mut handle_cx) {
self.closed = true;
if let Err(e) = res {
break Poll::Ready(Err(e));
}
}
} else if self.set_parent_waker(cx.waker()) {
// if we updated the stored waker, mark all as woken an try polling again
// This doesn't actually "wake" the waker, it just allows the handle to be polled
// again next iteration
self.mark_all_woken();
} else {
// if the waker hasn't changed and nothing polled ready, return pending
break Poll::Pending;
}
};
drop(guard);
value
}
}
impl Wake for ProcessReadWaker {
fn wake(self: Arc<Self>) {
match Arc::try_unwrap(self) {
Ok(ProcessReadWaker { state, flag }) => match Arc::try_unwrap(state) {
Ok(state) => {
let (flags, parent) = state.into_parts();
flags.fetch_and(flag, Ordering::AcqRel);
if let Some(parent) = parent {
parent.wake();
}
}
Err(state) => {
state.flags.fetch_or(flag, Ordering::AcqRel);
if let Some(waker) = state.clone_parent() {
waker.wake();
}
}
},
Err(this) => this.wake_by_ref(),
}
}
fn wake_by_ref(self: &Arc<Self>) {
self.state.flags.fetch_or(self.flag, Ordering::AcqRel);
if let Some(parent) = self.state.clone_parent() {
parent.wake();
} }
} }
} }

View File

@ -10,6 +10,7 @@ use crate::{
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
InternalFormat, Validations, InternalFormat, Validations,
}, },
process::ProcessRead,
read::BoxRead, read::BoxRead,
tmp_file::TmpDir, tmp_file::TmpDir,
}; };
@ -61,7 +62,7 @@ pub(crate) async fn validate_bytes(
bytes: Bytes, bytes: Bytes,
validations: Validations<'_>, validations: Validations<'_>,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, BoxRead<'static>), Error> { ) -> Result<(InternalFormat, ProcessRead), Error> {
if bytes.is_empty() { if bytes.is_empty() {
return Err(ValidationError::Empty.into()); return Err(ValidationError::Empty.into());
} }
@ -75,7 +76,7 @@ pub(crate) async fn validate_bytes(
match &input { match &input {
InputFile::Image(input) => { InputFile::Image(input) => {
let (format, read) = process_image( let (format, process_read) = process_image(
tmp_dir, tmp_dir,
bytes, bytes,
*input, *input,
@ -86,10 +87,10 @@ pub(crate) async fn validate_bytes(
) )
.await?; .await?;
Ok((format, read)) Ok((format, process_read))
} }
InputFile::Animation(input) => { InputFile::Animation(input) => {
let (format, read) = process_animation( let (format, process_read) = process_animation(
tmp_dir, tmp_dir,
bytes, bytes,
*input, *input,
@ -101,10 +102,10 @@ pub(crate) async fn validate_bytes(
) )
.await?; .await?;
Ok((format, read)) Ok((format, process_read))
} }
InputFile::Video(input) => { InputFile::Video(input) => {
let (format, read) = process_video( let (format, process_read) = process_video(
tmp_dir, tmp_dir,
bytes, bytes,
*input, *input,
@ -116,7 +117,7 @@ pub(crate) async fn validate_bytes(
) )
.await?; .await?;
Ok((format, read)) Ok((format, process_read))
} }
} }
} }
@ -130,7 +131,7 @@ async fn process_image(
height: u16, height: u16,
validations: &crate::config::Image, validations: &crate::config::Image,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, BoxRead<'static>), Error> { ) -> Result<(InternalFormat, ProcessRead), Error> {
if width > validations.max_width { if width > validations.max_width {
return Err(ValidationError::Width.into()); return Err(ValidationError::Width.into());
} }
@ -149,7 +150,7 @@ async fn process_image(
needs_transcode, needs_transcode,
} = input.build_output(validations.format); } = input.build_output(validations.format);
let read = if needs_transcode { let process_read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await? magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await?
@ -157,7 +158,7 @@ async fn process_image(
exiftool::clear_metadata_bytes_read(bytes, timeout)? exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
Ok((InternalFormat::Image(format), read)) Ok((InternalFormat::Image(format), process_read))
} }
fn validate_animation( fn validate_animation(
@ -197,7 +198,7 @@ async fn process_animation(
frames: u32, frames: u32,
validations: &crate::config::Animation, validations: &crate::config::Animation,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, BoxRead<'static>), Error> { ) -> Result<(InternalFormat, ProcessRead), Error> {
validate_animation(bytes.len(), width, height, frames, validations)?; validate_animation(bytes.len(), width, height, frames, validations)?;
let AnimationOutput { let AnimationOutput {
@ -205,7 +206,7 @@ async fn process_animation(
needs_transcode, needs_transcode,
} = input.build_output(validations.format); } = input.build_output(validations.format);
let read = if needs_transcode { let process_read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await? magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await?
@ -213,7 +214,7 @@ async fn process_animation(
exiftool::clear_metadata_bytes_read(bytes, timeout)? exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
Ok((InternalFormat::Animation(format), read)) Ok((InternalFormat::Animation(format), process_read))
} }
fn validate_video( fn validate_video(
@ -256,7 +257,7 @@ async fn process_video(
frames: u32, frames: u32,
validations: &crate::config::Video, validations: &crate::config::Video,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, BoxRead<'static>), Error> { ) -> Result<(InternalFormat, ProcessRead), Error> {
validate_video(bytes.len(), width, height, frames, validations)?; validate_video(bytes.len(), width, height, frames, validations)?;
let output = input.build_output( let output = input.build_output(
@ -267,7 +268,10 @@ async fn process_video(
let crf = validations.crf_for(width, height); let crf = validations.crf_for(width, height);
let read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?; let process_read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?;
Ok((InternalFormat::Video(output.format.internal_format()), read)) Ok((
InternalFormat::Video(output.format.internal_format()),
process_read,
))
} }

View File

@ -1,13 +1,15 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use crate::{exiftool::ExifError, process::Process, read::BoxRead}; use crate::{
exiftool::ExifError,
process::{Process, ProcessRead},
read::BoxRead,
};
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn clear_metadata_bytes_read( pub(crate) fn clear_metadata_bytes_read(
input: Bytes, input: Bytes,
timeout: u64, timeout: u64,
) -> Result<BoxRead<'static>, ExifError> { ) -> Result<ProcessRead, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input))
Ok(Box::pin(process.bytes_read(input)))
} }

View File

@ -1,11 +1,12 @@
use std::ffi::OsStr; use std::{ffi::OsStr, sync::Arc};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use uuid::Uuid;
use crate::{ use crate::{
ffmpeg::FfMpegError, ffmpeg::FfMpegError,
formats::{InputVideoFormat, OutputVideo}, formats::{InputVideoFormat, OutputVideo},
process::Process, process::{Process, ProcessRead},
read::BoxRead, read::BoxRead,
tmp_file::TmpDir, tmp_file::TmpDir,
}; };
@ -17,7 +18,7 @@ pub(super) async fn transcode_bytes(
crf: u8, crf: u8,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, FfMpegError> { ) -> Result<ProcessRead, FfMpegError> {
let input_file = tmp_dir.tmp_file(None); let input_file = tmp_dir.tmp_file(None);
crate::store::file_store::safe_create_parent(&input_file) crate::store::file_store::safe_create_parent(&input_file)
.await .await
@ -52,9 +53,15 @@ pub(super) async fn transcode_bytes(
.await .await
.map_err(FfMpegError::ReadFile)?; .map_err(FfMpegError::ReadFile)?;
let reader = tokio_util::io::StreamReader::new(stream); let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = output_file.reader(reader);
Ok(Box::pin(clean_reader)) let process_read = ProcessRead::new(
Box::pin(reader),
Arc::from(String::from("ffmpeg")),
Uuid::now_v7(),
)
.add_extras(output_file);
Ok(process_read)
} }
async fn transcode_files( async fn transcode_files(

View File

@ -5,9 +5,9 @@ use actix_web::web::Bytes;
use crate::{ use crate::{
formats::{AnimationFormat, ImageFormat}, formats::{AnimationFormat, ImageFormat},
magick::{MagickError, MAGICK_TEMPORARY_PATH}, magick::{MagickError, MAGICK_TEMPORARY_PATH},
process::Process, process::{Process, ProcessRead},
read::BoxRead, read::BoxRead,
tmp_file::TmpDir, tmp_file::{TmpDir, TmpFile, TmpFolder},
}; };
pub(super) async fn convert_image( pub(super) async fn convert_image(
@ -17,7 +17,7 @@ pub(super) async fn convert_image(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
convert( convert(
tmp_dir, tmp_dir,
input.magick_format(), input.magick_format(),
@ -37,7 +37,7 @@ pub(super) async fn convert_animation(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
convert( convert(
tmp_dir, tmp_dir,
input.magick_format(), input.magick_format(),
@ -58,7 +58,7 @@ async fn convert(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<ProcessRead, MagickError> {
let temporary_path = tmp_dir let temporary_path = tmp_dir
.tmp_folder() .tmp_folder()
.await .await
@ -101,8 +101,7 @@ async fn convert(
let reader = Process::run("magick", &args, &envs, timeout)?.read(); let reader = Process::run("magick", &args, &envs, timeout)?.read();
let clean_reader = input_file.reader(reader); let clean_reader = reader.add_extras(input_file).add_extras(temporary_path);
let clean_reader = temporary_path.reader(clean_reader);
Ok(Box::pin(clean_reader)) Ok(clean_reader)
} }