mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Even more piping
This commit is contained in:
parent
13fc0df31a
commit
03bd3cbe2f
5 changed files with 43 additions and 129 deletions
|
@ -1,8 +1,6 @@
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
bytes_stream::BytesStream,
|
bytes_stream::BytesStream,
|
||||||
discover::DiscoverError,
|
discover::DiscoverError,
|
||||||
|
@ -50,39 +48,17 @@ pub(super) async fn confirm_bytes_stream<S>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
discover_file(state, move |mut file| async move {
|
discover(state, bytes).await
|
||||||
file.write_from_stream(bytes.into_io_stream())
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::Write)?;
|
|
||||||
|
|
||||||
Ok(file)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(level = "debug", skip_all)]
|
||||||
async fn discover_file<S, F, Fut>(state: &State<S>, f: F) -> Result<Discovery, MagickError>
|
async fn discover<S>(state: &State<S>, stream: BytesStream) -> Result<Discovery, MagickError> {
|
||||||
where
|
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
|
||||||
{
|
|
||||||
let temporary_path = state
|
let temporary_path = state
|
||||||
.tmp_dir
|
.tmp_dir
|
||||||
.tmp_folder()
|
.tmp_folder()
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||||
|
|
||||||
let input_file = state.tmp_dir.tmp_file(None);
|
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::CreateDir)?;
|
|
||||||
|
|
||||||
let tmp_one = crate::file::File::create(&input_file)
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::CreateFile)?;
|
|
||||||
let tmp_one = (f)(tmp_one).await?;
|
|
||||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
|
||||||
|
|
||||||
let envs = [
|
let envs = [
|
||||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||||
|
@ -91,19 +67,16 @@ where
|
||||||
let res = Process::run(
|
let res = Process::run(
|
||||||
"magick",
|
"magick",
|
||||||
&[
|
&[
|
||||||
"convert".as_ref(),
|
"convert", // "-ping".as_ref(), // re-enable -ping after imagemagick fix
|
||||||
// "-ping".as_ref(), // re-enable -ping after imagemagick fix
|
"-", "JSON:",
|
||||||
input_file.as_os_str(),
|
|
||||||
"JSON:".as_ref(),
|
|
||||||
],
|
],
|
||||||
&envs,
|
&envs,
|
||||||
state.config.media.process_timeout,
|
state.config.media.process_timeout,
|
||||||
)?
|
)?
|
||||||
.read()
|
.drive_with_async_read(stream.into_reader())
|
||||||
.into_string()
|
.into_string()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
input_file.cleanup().await.map_err(MagickError::Cleanup)?;
|
|
||||||
temporary_path
|
temporary_path
|
||||||
.cleanup()
|
.cleanup()
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -173,38 +173,41 @@ where
|
||||||
.await?
|
.await?
|
||||||
.ok_or(UploadError::MissingIdentifier)?;
|
.ok_or(UploadError::MissingIdentifier)?;
|
||||||
|
|
||||||
let (reader, media_type) = if let Some(processable_format) =
|
let (reader, media_type) =
|
||||||
original_details.internal_format().processable_format()
|
if let Some(processable_format) = original_details.internal_format().processable_format() {
|
||||||
{
|
let thumbnail_format = state.config.media.image.format.unwrap_or(ImageFormat::Webp);
|
||||||
let thumbnail_format = state.config.media.image.format.unwrap_or(ImageFormat::Webp);
|
|
||||||
|
|
||||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||||
|
|
||||||
let reader = magick::thumbnail(state, stream, processable_format, thumbnail_format).await?;
|
let process =
|
||||||
|
magick::thumbnail_command(state, processable_format, thumbnail_format).await?;
|
||||||
|
|
||||||
(reader, thumbnail_format.media_type())
|
(
|
||||||
} else {
|
process.drive_with_stream(stream),
|
||||||
let thumbnail_format = match state.config.media.image.format {
|
thumbnail_format.media_type(),
|
||||||
Some(ImageFormat::Webp | ImageFormat::Avif | ImageFormat::Jxl) => {
|
)
|
||||||
ffmpeg::ThumbnailFormat::Webp
|
} else {
|
||||||
}
|
let thumbnail_format = match state.config.media.image.format {
|
||||||
Some(ImageFormat::Png) => ffmpeg::ThumbnailFormat::Png,
|
Some(ImageFormat::Webp | ImageFormat::Avif | ImageFormat::Jxl) => {
|
||||||
Some(ImageFormat::Jpeg) | None => ffmpeg::ThumbnailFormat::Jpeg,
|
ffmpeg::ThumbnailFormat::Webp
|
||||||
|
}
|
||||||
|
Some(ImageFormat::Png) => ffmpeg::ThumbnailFormat::Png,
|
||||||
|
Some(ImageFormat::Jpeg) | None => ffmpeg::ThumbnailFormat::Jpeg,
|
||||||
|
};
|
||||||
|
|
||||||
|
let reader = ffmpeg::thumbnail(
|
||||||
|
state,
|
||||||
|
identifier,
|
||||||
|
original_details
|
||||||
|
.video_format()
|
||||||
|
.unwrap_or(InternalVideoFormat::Mp4),
|
||||||
|
thumbnail_format,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
(reader, thumbnail_format.media_type())
|
||||||
};
|
};
|
||||||
|
|
||||||
let reader = ffmpeg::thumbnail(
|
|
||||||
state,
|
|
||||||
identifier,
|
|
||||||
original_details
|
|
||||||
.video_format()
|
|
||||||
.unwrap_or(InternalVideoFormat::Mp4),
|
|
||||||
thumbnail_format,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
(reader, thumbnail_format.media_type())
|
|
||||||
};
|
|
||||||
|
|
||||||
let motion_identifier = reader
|
let motion_identifier = reader
|
||||||
.with_stdout(|stdout| async { state.store.save_async_read(stdout, media_type).await })
|
.with_stdout(|stdout| async { state.store.save_async_read(stdout, media_type).await })
|
||||||
.await??;
|
.await??;
|
||||||
|
|
|
@ -1,25 +1,17 @@
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
formats::{ImageFormat, ProcessableFormat},
|
formats::{ImageFormat, ProcessableFormat},
|
||||||
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||||
process::{Process, ProcessRead},
|
process::Process,
|
||||||
state::State,
|
state::State,
|
||||||
stream::LocalBoxStream,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
async fn thumbnail_animation<S, F, Fut>(
|
pub(super) async fn thumbnail_command<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
input_format: ProcessableFormat,
|
input_format: ProcessableFormat,
|
||||||
thumbnail_format: ImageFormat,
|
thumbnail_format: ImageFormat,
|
||||||
write_file: F,
|
) -> Result<Process, MagickError> {
|
||||||
) -> Result<ProcessRead, MagickError>
|
|
||||||
where
|
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
|
||||||
{
|
|
||||||
let format = ProcessableFormat::Image(thumbnail_format);
|
let format = ProcessableFormat::Image(thumbnail_format);
|
||||||
let quality = state.config.media.image.quality_for(thumbnail_format);
|
let quality = state.config.media.image.quality_for(thumbnail_format);
|
||||||
|
|
||||||
|
@ -29,22 +21,7 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||||
|
|
||||||
let input_file = state.tmp_dir.tmp_file(None);
|
let input_arg = format!("{}:-", input_format.magick_format());
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::CreateDir)?;
|
|
||||||
|
|
||||||
let tmp_one = crate::file::File::create(&input_file)
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::CreateFile)?;
|
|
||||||
let tmp_one = (write_file)(tmp_one).await?;
|
|
||||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
|
||||||
|
|
||||||
let input_arg = [
|
|
||||||
input_format.magick_format().as_ref(),
|
|
||||||
input_file.as_os_str(),
|
|
||||||
]
|
|
||||||
.join(":".as_ref());
|
|
||||||
let output_arg = format!("{}:-", format.magick_format());
|
let output_arg = format!("{}:-", format.magick_format());
|
||||||
let quality = quality.map(|q| q.to_string());
|
let quality = quality.map(|q| q.to_string());
|
||||||
|
|
||||||
|
@ -52,7 +29,7 @@ where
|
||||||
|
|
||||||
let mut args: Vec<&OsStr> = Vec::with_capacity(len);
|
let mut args: Vec<&OsStr> = Vec::with_capacity(len);
|
||||||
args.push("convert".as_ref());
|
args.push("convert".as_ref());
|
||||||
args.push(&input_arg);
|
args.push(input_arg.as_ref());
|
||||||
if format.coalesce() {
|
if format.coalesce() {
|
||||||
args.push("-coalesce".as_ref());
|
args.push("-coalesce".as_ref());
|
||||||
}
|
}
|
||||||
|
@ -66,31 +43,8 @@ where
|
||||||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||||
];
|
];
|
||||||
|
|
||||||
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
||||||
.read()
|
|
||||||
.add_extras(input_file)
|
|
||||||
.add_extras(temporary_path);
|
.add_extras(temporary_path);
|
||||||
|
|
||||||
Ok(reader)
|
Ok(process)
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) async fn thumbnail<S>(
|
|
||||||
state: &State<S>,
|
|
||||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
|
||||||
input_format: ProcessableFormat,
|
|
||||||
thumbnail_format: ImageFormat,
|
|
||||||
) -> Result<ProcessRead, MagickError> {
|
|
||||||
thumbnail_animation(
|
|
||||||
state,
|
|
||||||
input_format,
|
|
||||||
thumbnail_format,
|
|
||||||
|mut tmp_file| async move {
|
|
||||||
tmp_file
|
|
||||||
.write_from_stream(stream)
|
|
||||||
.await
|
|
||||||
.map_err(MagickError::Write)?;
|
|
||||||
Ok(tmp_file)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,21 +20,9 @@ pub(crate) enum MagickError {
|
||||||
#[error("Invalid output format: {0}")]
|
#[error("Invalid output format: {0}")]
|
||||||
Json(String, #[source] serde_json::Error),
|
Json(String, #[source] serde_json::Error),
|
||||||
|
|
||||||
#[error("Error writing bytes")]
|
|
||||||
Write(#[source] std::io::Error),
|
|
||||||
|
|
||||||
#[error("Error creating file")]
|
|
||||||
CreateFile(#[source] std::io::Error),
|
|
||||||
|
|
||||||
#[error("Error creating directory")]
|
|
||||||
CreateDir(#[source] crate::store::file_store::FileError),
|
|
||||||
|
|
||||||
#[error("Error creating temporary directory")]
|
#[error("Error creating temporary directory")]
|
||||||
CreateTemporaryDirectory(#[source] std::io::Error),
|
CreateTemporaryDirectory(#[source] std::io::Error),
|
||||||
|
|
||||||
#[error("Error closing file")]
|
|
||||||
CloseFile(#[source] std::io::Error),
|
|
||||||
|
|
||||||
#[error("Error in metadata discovery")]
|
#[error("Error in metadata discovery")]
|
||||||
Discover(#[source] crate::discover::DiscoverError),
|
Discover(#[source] crate::discover::DiscoverError),
|
||||||
|
|
||||||
|
@ -63,11 +51,7 @@ impl MagickError {
|
||||||
Self::CommandFailed(_) => ErrorCode::COMMAND_FAILURE,
|
Self::CommandFailed(_) => ErrorCode::COMMAND_FAILURE,
|
||||||
Self::Process(e) => e.error_code(),
|
Self::Process(e) => e.error_code(),
|
||||||
Self::Json(_, _)
|
Self::Json(_, _)
|
||||||
| Self::Write(_)
|
|
||||||
| Self::CreateFile(_)
|
|
||||||
| Self::CreateDir(_)
|
|
||||||
| Self::CreateTemporaryDirectory(_)
|
| Self::CreateTemporaryDirectory(_)
|
||||||
| Self::CloseFile(_)
|
|
||||||
| Self::Discover(_)
|
| Self::Discover(_)
|
||||||
| Self::Cleanup(_)
|
| Self::Cleanup(_)
|
||||||
| Self::Empty => ErrorCode::COMMAND_ERROR,
|
| Self::Empty => ErrorCode::COMMAND_ERROR,
|
||||||
|
|
|
@ -415,7 +415,7 @@ impl ProcessRead {
|
||||||
let handle = Box::pin(async move {
|
let handle = Box::pin(async move {
|
||||||
self.with_stdout(move |mut stdout| async move {
|
self.with_stdout(move |mut stdout| async move {
|
||||||
let child_fut = async {
|
let child_fut = async {
|
||||||
let n = tokio::io::copy(&mut stdout, &mut stdin).await?;
|
tokio::io::copy(&mut stdout, &mut stdin).await?;
|
||||||
drop(stdout);
|
drop(stdout);
|
||||||
drop(stdin);
|
drop(stdin);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue