diff --git a/src/details.rs b/src/details.rs index c22c2dd..cf8d4a2 100644 --- a/src/details.rs +++ b/src/details.rs @@ -3,6 +3,7 @@ use crate::{ error::Error, formats::{InternalFormat, InternalVideoFormat}, serde_str::Serde, + tmp_file::TmpDir, }; use actix_web::web; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -79,13 +80,17 @@ impl Details { } #[tracing::instrument(level = "DEBUG")] - pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result { + pub(crate) async fn from_bytes( + tmp_dir: &TmpDir, + timeout: u64, + input: web::Bytes, + ) -> Result { let Discovery { input, width, height, frames, - } = crate::discover::discover_bytes(timeout, input).await?; + } = crate::discover::discover_bytes(tmp_dir, timeout, input).await?; Ok(Details::from_parts( input.internal_format(), diff --git a/src/discover.rs b/src/discover.rs index acbe53f..481112d 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -4,7 +4,7 @@ mod magick; use actix_web::web::Bytes; -use crate::formats::InputFile; +use crate::{formats::InputFile, tmp_file::TmpDir}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -27,12 +27,13 @@ pub(crate) enum DiscoverError { } pub(crate) async fn discover_bytes( + tmp_dir: &TmpDir, timeout: u64, bytes: Bytes, ) -> Result { - let discovery = ffmpeg::discover_bytes(timeout, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes(tmp_dir, timeout, bytes.clone()).await?; - let discovery = magick::confirm_bytes(discovery, timeout, bytes.clone()).await?; + let discovery = magick::confirm_bytes(tmp_dir, discovery, timeout, bytes.clone()).await?; let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 6700742..a1cfbda 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -10,6 +10,7 @@ use crate::{ Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec, }, process::Process, + tmp_file::TmpDir, }; use actix_web::web::Bytes; use tokio::io::AsyncReadExt; @@ -158,22 +159,20 @@ struct Flags { } pub(super) async fn discover_bytes( + tmp_dir: &TmpDir, timeout: u64, bytes: Bytes, ) -> Result, FfMpegError> { - discover_file( - move |mut file| { - let bytes = bytes.clone(); + discover_file(tmp_dir, timeout, move |mut file| { + let bytes = bytes.clone(); - async move { - file.write_from_bytes(bytes) - .await - .map_err(FfMpegError::Write)?; - Ok(file) - } - }, - timeout, - ) + async move { + file.write_from_bytes(bytes) + .await + .map_err(FfMpegError::Write)?; + Ok(file) + } + }) .await } @@ -192,12 +191,16 @@ async fn allows_alpha(pixel_format: &str, timeout: u64) -> Result(f: F, timeout: u64) -> Result, FfMpegError> +async fn discover_file( + tmp_dir: &TmpDir, + timeout: u64, + f: F, +) -> Result, FfMpegError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 48c33fd..7adbe06 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -9,6 +9,7 @@ use crate::{ formats::{AnimationFormat, ImageFormat, ImageInput, InputFile}, magick::MagickError, process::Process, + tmp_file::TmpDir, }; use super::Discovery; @@ -31,6 +32,7 @@ struct Geometry { } pub(super) async fn confirm_bytes( + tmp_dir: &TmpDir, discovery: Option, timeout: u64, bytes: Bytes, @@ -42,15 +44,12 @@ pub(super) async fn confirm_bytes( height, .. }) => { - let frames = count_avif_frames( - move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(MagickError::Write)?; - Ok(file) - }, - timeout, - ) + let frames = count_avif_frames(tmp_dir, timeout, move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; + Ok(file) + }) .await?; if frames == 1 { @@ -84,26 +83,23 @@ pub(super) async fn confirm_bytes( } } - discover_file( - move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(MagickError::Write)?; + discover_file(tmp_dir, timeout, move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; - Ok(file) - }, - timeout, - ) + Ok(file) + }) .await } #[tracing::instrument(level = "DEBUG", skip(f))] -async fn count_avif_frames(f: F, timeout: u64) -> Result +async fn count_avif_frames(tmp_dir: &TmpDir, timeout: u64, f: F) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await @@ -149,12 +145,16 @@ where } #[tracing::instrument(level = "DEBUG", skip(f))] -async fn discover_file(f: F, timeout: u64) -> Result +async fn discover_file( + tmp_dir: &TmpDir, + timeout: u64, + f: F, +) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await diff --git a/src/generate.rs b/src/generate.rs index 851124e..f5ce429 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -8,6 +8,7 @@ use crate::{ formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, repo::{ArcRepo, Hash, VariantAlreadyExists}, store::Store, + tmp_file::TmpDir, }; use actix_web::web::Bytes; use std::{path::PathBuf, sync::Arc, time::Instant}; @@ -43,6 +44,7 @@ impl Drop for MetricsGuard { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, hash, process_map, media))] pub(crate) async fn generate( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, process_map: &ProcessMap, @@ -54,6 +56,7 @@ pub(crate) async fn generate( hash: Hash, ) -> Result<(Details, Bytes), Error> { let process_fut = process( + tmp_dir, repo, store, format, @@ -74,6 +77,7 @@ pub(crate) async fn generate( #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, hash, media))] async fn process( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, output_format: InputProcessableFormat, @@ -87,6 +91,7 @@ async fn process( let permit = crate::process_semaphore().acquire().await?; let identifier = input_identifier( + tmp_dir, repo, store, output_format, @@ -101,7 +106,8 @@ async fn process( } else { let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?; + let details = + Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?; repo.relate_details(&identifier, &details).await?; @@ -121,6 +127,7 @@ async fn process( }; let mut processed_reader = crate::magick::process_image_store_read( + tmp_dir, store, &identifier, thumbnail_args, @@ -140,7 +147,7 @@ async fn process( drop(permit); - let details = Details::from_bytes(media.process_timeout, bytes.clone()).await?; + let details = Details::from_bytes(tmp_dir, media.process_timeout, bytes.clone()).await?; let identifier = store .save_bytes(bytes.clone(), details.media_type()) @@ -166,6 +173,7 @@ async fn process( #[tracing::instrument(skip_all)] async fn input_identifier( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, output_format: InputProcessableFormat, @@ -202,6 +210,7 @@ where let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp); let reader = magick::thumbnail( + tmp_dir, store, &identifier, processable_format, @@ -222,6 +231,7 @@ where }; let reader = ffmpeg::thumbnail( + tmp_dir, store.clone(), identifier, original_details diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index 3e2464d..4115f9f 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead, - store::Store, + store::Store, tmp_file::TmpDir, }; #[derive(Clone, Copy, Debug)] @@ -47,19 +47,20 @@ impl ThumbnailFormat { #[tracing::instrument(skip(store))] pub(super) async fn thumbnail( + tmp_dir: &TmpDir, store: S, from: Arc, input_format: InternalVideoFormat, format: ThumbnailFormat, timeout: u64, ) -> Result, FfMpegError> { - let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension())); + let input_file = tmp_dir.tmp_file(Some(input_format.file_extension())); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await .map_err(FfMpegError::CreateDir)?; - let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension())); + let output_file = tmp_dir.tmp_file(Some(format.to_file_extension())); let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?; crate::store::file_store::safe_create_parent(&output_file) .await diff --git a/src/generate/magick.rs b/src/generate/magick.rs index 5776a87..bb89183 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -2,9 +2,11 @@ use std::sync::Arc; use crate::{ formats::ProcessableFormat, magick::MagickError, process::Process, read::BoxRead, store::Store, + tmp_file::TmpDir, }; async fn thumbnail_animation( + tmp_dir: &TmpDir, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, @@ -15,7 +17,7 @@ where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await @@ -52,6 +54,7 @@ where } pub(super) async fn thumbnail( + tmp_dir: &TmpDir, store: &S, identifier: &Arc, input_format: ProcessableFormat, @@ -65,6 +68,7 @@ pub(super) async fn thumbnail( .map_err(MagickError::Store)?; thumbnail_animation( + tmp_dir, input_format, format, quality, diff --git a/src/ingest.rs b/src/ingest.rs index 1a0639c..df95a9b 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -86,6 +86,7 @@ where }; crate::magick::process_image_async_read( + tmp_dir, validated_reader, magick_args, format, @@ -109,7 +110,8 @@ where .await?; let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?; + let details = + Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?; drop(permit); diff --git a/src/lib.rs b/src/lib.rs index 4556fa6..c92fd90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,6 +103,7 @@ fn process_semaphore() -> &'static Semaphore { } async fn ensure_details( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, config: &Configuration, @@ -124,8 +125,12 @@ async fn ensure_details( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let new_details = - Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()).await?; + let new_details = Details::from_bytes( + tmp_dir, + config.media.process_timeout, + bytes_stream.into_bytes(), + ) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -296,28 +301,31 @@ impl FormData for Import { #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn upload( Multipart(Upload(value, _)): Multipart>, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, ) -> Result { - handle_upload(value, repo, store, config).await + handle_upload(value, tmp_dir, repo, store, config).await } /// Handle responding to successful uploads #[tracing::instrument(name = "Imported files", skip(value, repo, store, config))] async fn import( Multipart(Import(value, _)): Multipart>, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, ) -> Result { - handle_upload(value, repo, store, config).await + handle_upload(value, tmp_dir, repo, store, config).await } /// Handle responding to successful uploads #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn handle_upload( value: Value, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -339,7 +347,7 @@ async fn handle_upload( tracing::debug!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token(); - let details = ensure_details(&repo, &store, &config, alias).await?; + let details = ensure_details(&tmp_dir, &repo, &store, &config, alias).await?; files.push(serde_json::json!({ "file": alias.to_string(), @@ -468,6 +476,7 @@ struct ClaimQuery { /// Claim a backgrounded upload #[tracing::instrument(name = "Waiting on upload", skip_all)] async fn claim_upload( + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -487,7 +496,7 @@ async fn claim_upload( match upload_result { UploadResult::Success { alias, token } => { - let details = ensure_details(&repo, &store, &config, &alias).await?; + let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -529,7 +538,7 @@ async fn ingest_inline( let alias = session.alias().expect("alias should exist").to_owned(); - let details = ensure_details(repo, store, config, &alias).await?; + let details = ensure_details(tmp_dir, repo, store, config, &alias).await?; let delete_token = session.disarm(); @@ -925,9 +934,12 @@ async fn process( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let new_details = - Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()) - .await?; + let new_details = Details::from_bytes( + &tmp_dir, + config.media.process_timeout, + bytes_stream.into_bytes(), + ) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -947,9 +959,10 @@ async fn process( return Err(UploadError::ReadOnly.into()); } - let original_details = ensure_details(&repo, &store, &config, &alias).await?; + let original_details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?; let (details, bytes) = generate::generate( + &tmp_dir, &repo, &store, &process_map, @@ -1001,6 +1014,7 @@ async fn process_head( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1045,9 +1059,12 @@ async fn process_head( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let new_details = - Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()) - .await?; + let new_details = Details::from_bytes( + &tmp_dir, + config.media.process_timeout, + bytes_stream.into_bytes(), + ) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -1114,6 +1131,7 @@ async fn process_backgrounded( #[tracing::instrument(name = "Fetching query details", skip(repo, store, config))] async fn details_query( web::Query(alias_query): web::Query, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1130,27 +1148,36 @@ async fn details_query( } }; - do_details(alias, repo, store, config).await + do_details(alias, tmp_dir, repo, store, config).await } /// Fetch file details #[tracing::instrument(name = "Fetching details", skip(repo, store, config))] async fn details( alias: web::Path>, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, ) -> Result { - do_details(Serde::into_inner(alias.into_inner()), repo, store, config).await + do_details( + Serde::into_inner(alias.into_inner()), + tmp_dir, + repo, + store, + config, + ) + .await } async fn do_details( alias: Alias, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, ) -> Result { - let details = ensure_details(&repo, &store, &config, &alias).await?; + let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?; Ok(HttpResponse::Ok().json(&details.into_api_details())) } @@ -1192,7 +1219,7 @@ async fn serve_query( } }; - do_serve(range, alias, repo, store, config).await + do_serve(range, alias, tmp_dir, repo, store, config).await } /// Serve files @@ -1200,6 +1227,7 @@ async fn serve_query( async fn serve( range: Option>, alias: web::Path>, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1207,6 +1235,7 @@ async fn serve( do_serve( range, Serde::into_inner(alias.into_inner()), + tmp_dir, repo, store, config, @@ -1217,6 +1246,7 @@ async fn serve( async fn do_serve( range: Option>, alias: Alias, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1237,7 +1267,7 @@ async fn do_serve( return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&repo, &store, &config, &alias).await?; + let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() @@ -1252,6 +1282,7 @@ async fn do_serve( async fn serve_query_head( range: Option>, web::Query(alias_query): web::Query, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1266,13 +1297,14 @@ async fn serve_query_head( } }; - do_serve_head(range, alias, repo, store, config).await + do_serve_head(range, alias, tmp_dir, repo, store, config).await } #[tracing::instrument(name = "Serving file headers", skip(repo, store, config))] async fn serve_head( range: Option>, alias: web::Path>, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1280,6 +1312,7 @@ async fn serve_head( do_serve_head( range, Serde::into_inner(alias.into_inner()), + tmp_dir, repo, store, config, @@ -1290,6 +1323,7 @@ async fn serve_head( async fn do_serve_head( range: Option>, alias: Alias, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -1299,7 +1333,7 @@ async fn do_serve_head( return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&repo, &store, &config, &alias).await?; + let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() @@ -1876,7 +1910,9 @@ async fn launch_object_store( + tmp_dir: ArcTmpDir, repo: ArcRepo, client: ClientWithMiddleware, from: S1, @@ -1892,7 +1928,16 @@ where config::primitives::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; - migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await? + migrate_store( + tmp_dir, + repo, + from, + to, + skip_missing_files, + timeout, + concurrency, + ) + .await? } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1926,7 +1971,16 @@ where .await? .build(client); - migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await? + migrate_store( + tmp_dir, + repo, + from, + to, + skip_missing_files, + timeout, + concurrency, + ) + .await? } } @@ -2022,6 +2076,8 @@ impl PictRsConfiguration { pub async fn run(self) -> color_eyre::Result<()> { let PictRsConfiguration { config, operation } = self; + let tmp_dir = TmpDir::init().await?; + let client = build_client()?; match operation { @@ -2038,6 +2094,7 @@ impl PictRsConfiguration { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; migrate_inner( + tmp_dir, repo, client, from, @@ -2083,6 +2140,7 @@ impl PictRsConfiguration { .build(client.clone()); migrate_inner( + tmp_dir, repo, client, from, @@ -2112,8 +2170,6 @@ impl PictRsConfiguration { tracing::warn!("Launching in READ ONLY mode"); } - let tmp_dir = TmpDir::init().await?; - match config.store.clone() { config::Store::Filesystem(config::Filesystem { path }) => { let arc_repo = repo.to_arc(); @@ -2124,6 +2180,7 @@ impl PictRsConfiguration { if let Some(path) = config.old_repo_path() { if let Some(old_repo) = repo_04::open(path)? { repo::migrate_04( + tmp_dir.clone(), old_repo, arc_repo.clone(), store.clone(), @@ -2187,6 +2244,7 @@ impl PictRsConfiguration { if let Some(path) = config.old_repo_path() { if let Some(old_repo) = repo_04::open(path)? { repo::migrate_04( + tmp_dir.clone(), old_repo, arc_repo.clone(), store.clone(), @@ -2215,8 +2273,6 @@ impl PictRsConfiguration { } } - self::tmp_file::remove_tmp_dir().await?; - Ok(()) } } diff --git a/src/magick.rs b/src/magick.rs index 61113a2..79ed481 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -6,6 +6,7 @@ use crate::{ process::{Process, ProcessError}, read::BoxRead, store::Store, + tmp_file::TmpDir, }; use tokio::io::AsyncRead; @@ -90,6 +91,7 @@ impl MagickError { } async fn process_image( + tmp_dir: &TmpDir, process_args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, @@ -101,7 +103,7 @@ where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await @@ -141,7 +143,9 @@ where Ok(Box::pin(clean_reader)) } +#[allow(clippy::too_many_arguments)] pub(crate) async fn process_image_store_read( + tmp_dir: &TmpDir, store: &S, identifier: &Arc, args: Vec, @@ -156,6 +160,7 @@ pub(crate) async fn process_image_store_read( .map_err(MagickError::Store)?; process_image( + tmp_dir, args, input_format, format, @@ -173,6 +178,7 @@ pub(crate) async fn process_image_store_read( } pub(crate) async fn process_image_async_read( + tmp_dir: &TmpDir, async_read: A, args: Vec, input_format: ProcessableFormat, @@ -181,6 +187,7 @@ pub(crate) async fn process_image_async_read( timeout: u64, ) -> Result, MagickError> { process_image( + tmp_dir, args, input_format, format, diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 6c07b57..94d8dda 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -14,9 +14,11 @@ use crate::{ error::{Error, UploadError}, repo::{ArcRepo, Hash}, store::Store, + tmp_file::{ArcTmpDir, TmpDir}, }; pub(super) async fn migrate_store( + tmp_dir: ArcTmpDir, repo: ArcRepo, from: S1, to: S2, @@ -44,6 +46,7 @@ where let mut failure_count = 0; while let Err(e) = do_migrate_store( + tmp_dir.clone(), repo.clone(), from.clone(), to.clone(), @@ -71,6 +74,7 @@ where } struct MigrateState { + tmp_dir: ArcTmpDir, repo: ArcRepo, from: S1, to: S2, @@ -85,6 +89,7 @@ struct MigrateState { } async fn do_migrate_store( + tmp_dir: ArcTmpDir, repo: ArcRepo, from: S1, to: S2, @@ -114,6 +119,7 @@ where let mut stream = stream.into_streamer(); let state = Rc::new(MigrateState { + tmp_dir: tmp_dir.clone(), repo: repo.clone(), from, to, @@ -161,6 +167,7 @@ where S2: Store, { let MigrateState { + tmp_dir, repo, from, to, @@ -223,7 +230,17 @@ where if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { + match migrate_file( + tmp_dir, + repo, + from, + to, + &identifier, + *skip_missing_files, + *timeout, + ) + .await + { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; repo.relate_motion_identifier(hash.clone(), &new_identifier) @@ -252,7 +269,17 @@ where for (variant, identifier) in repo.variants(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { + match migrate_file( + tmp_dir, + repo, + from, + to, + &identifier, + *skip_missing_files, + *timeout, + ) + .await + { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; repo.remove_variant(hash.clone(), variant.clone()).await?; @@ -282,6 +309,7 @@ where } match migrate_file( + tmp_dir, repo, from, to, @@ -340,6 +368,7 @@ where } async fn migrate_file( + tmp_dir: &TmpDir, repo: &ArcRepo, from: &S1, to: &S2, @@ -354,7 +383,7 @@ where let mut failure_count = 0; loop { - match do_migrate_file(repo, from, to, identifier, timeout).await { + match do_migrate_file(tmp_dir, repo, from, to, identifier, timeout).await { Ok(identifier) => return Ok(identifier), Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { return Err(MigrateError::From(e)); @@ -383,6 +412,7 @@ enum MigrateError { } async fn do_migrate_file( + tmp_dir: &TmpDir, repo: &ArcRepo, from: &S1, to: &S2, @@ -412,7 +442,7 @@ where .await .map_err(From::from) .map_err(MigrateError::Details)?; - let new_details = Details::from_bytes(timeout, bytes_stream.into_bytes()) + let new_details = Details::from_bytes(tmp_dir, timeout, bytes_stream.into_bytes()) .await .map_err(MigrateError::Details)?; repo.relate_details(identifier, &new_details) diff --git a/src/queue.rs b/src/queue.rs index 06e8eb3..57ade78 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -297,6 +297,7 @@ where } } +#[allow(clippy::too_many_arguments)] async fn process_image_jobs( tmp_dir: &ArcTmpDir, repo: &ArcRepo, diff --git a/src/queue/process.rs b/src/queue/process.rs index 1f2e7d0..33f64b9 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -12,7 +12,7 @@ use crate::{ repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, store::Store, - tmp_file::ArcTmpDir, + tmp_file::{ArcTmpDir, TmpDir}, }; use std::{path::PathBuf, sync::Arc}; @@ -55,6 +55,7 @@ where process_args, } => { generate( + tmp_dir, repo, store, process_map, @@ -110,7 +111,8 @@ impl Drop for UploadGuard { } } -#[tracing::instrument(skip(repo, store, client, media))] +#[allow(clippy::too_many_arguments)] +#[tracing::instrument(skip(tmp_dir, repo, store, client, media))] async fn process_ingest( tmp_dir: &ArcTmpDir, repo: &ArcRepo, @@ -183,6 +185,7 @@ where #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))] async fn generate( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, process_map: &ProcessMap, @@ -204,9 +207,10 @@ async fn generate( return Ok(()); } - let original_details = crate::ensure_details(repo, store, config, &source).await?; + let original_details = crate::ensure_details(tmp_dir, repo, store, config, &source).await?; crate::generate::generate( + tmp_dir, repo, store, process_map, diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 209c4ca..d67d773 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -13,6 +13,7 @@ use crate::{ SledRepo as OldSledRepo, }, store::Store, + tmp_file::{ArcTmpDir, TmpDir}, }; const GENERATOR_KEY: &str = "last-path"; @@ -67,6 +68,7 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result #[tracing::instrument(skip_all)] pub(crate) async fn migrate_04( + tmp_dir: ArcTmpDir, old_repo: OldSledRepo, new_repo: ArcRepo, store: S, @@ -99,6 +101,7 @@ pub(crate) async fn migrate_04( while let Some(res) = hash_stream.next().await { if let Ok(hash) = res { set.spawn_local(migrate_hash_04( + tmp_dir.clone(), old_repo.clone(), new_repo.clone(), store.clone(), @@ -169,6 +172,7 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) { } async fn migrate_hash_04( + tmp_dir: ArcTmpDir, old_repo: OldSledRepo, new_repo: ArcRepo, store: S, @@ -177,8 +181,15 @@ async fn migrate_hash_04( ) { let mut hash_failures = 0; - while let Err(e) = - do_migrate_hash_04(&old_repo, &new_repo, &store, &config, old_hash.clone()).await + while let Err(e) = do_migrate_hash_04( + &tmp_dir, + &old_repo, + &new_repo, + &store, + &config, + old_hash.clone(), + ) + .await { hash_failures += 1; @@ -266,6 +277,7 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> #[tracing::instrument(skip_all)] async fn do_migrate_hash_04( + tmp_dir: &TmpDir, old_repo: &OldSledRepo, new_repo: &ArcRepo, store: &S, @@ -279,7 +291,7 @@ async fn do_migrate_hash_04( let size = store.len(&identifier).await?; - let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?; + let hash_details = set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?; let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?; let variants = old_repo.variants(old_hash.clone()).await?; @@ -309,7 +321,7 @@ async fn do_migrate_hash_04( .relate_motion_identifier(hash.clone(), &identifier) .await?; - set_details(old_repo, new_repo, store, config, &identifier).await?; + set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?; } for (variant, identifier) in variants { @@ -317,7 +329,7 @@ async fn do_migrate_hash_04( .relate_variant_identifier(hash.clone(), variant.clone(), &identifier) .await?; - set_details(old_repo, new_repo, store, config, &identifier).await?; + set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?; new_repo.accessed_variant(hash.clone(), variant).await?; } @@ -326,6 +338,7 @@ async fn do_migrate_hash_04( } async fn set_details( + tmp_dir: &TmpDir, old_repo: &OldSledRepo, new_repo: &ArcRepo, store: &S, @@ -335,7 +348,8 @@ async fn set_details( if let Some(details) = new_repo.details(identifier).await? { Ok(details) } else { - let details = fetch_or_generate_details(old_repo, store, config, identifier).await?; + let details = + fetch_or_generate_details(tmp_dir, old_repo, store, config, identifier).await?; new_repo.relate_details(identifier, &details).await?; Ok(details) } @@ -355,6 +369,7 @@ fn details_semaphore() -> &'static Semaphore { #[tracing::instrument(skip_all)] async fn fetch_or_generate_details( + tmp_dir: &TmpDir, old_repo: &OldSledRepo, store: &S, config: &Configuration, @@ -369,7 +384,7 @@ async fn fetch_or_generate_details( let bytes = bytes_stream.into_bytes(); let guard = details_semaphore().acquire().await?; - let details = Details::from_bytes(config.media.process_timeout, bytes).await?; + let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes).await?; drop(guard); Ok(details) diff --git a/src/tmp_file.rs b/src/tmp_file.rs index ae4253c..cc772a7 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -1,7 +1,4 @@ -use std::{ - path::PathBuf, - sync::{Arc, OnceLock}, -}; +use std::{path::PathBuf, sync::Arc}; use tokio::io::AsyncRead; use uuid::Uuid; @@ -34,16 +31,6 @@ impl Drop for TmpDir { } } -static TMP_DIR: OnceLock = OnceLock::new(); - -fn tmp_dir() -> &'static PathBuf { - TMP_DIR.get_or_init(|| { - let dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - std::fs::create_dir(&dir).unwrap(); - dir - }) -} - struct TmpFile(PathBuf); impl Drop for TmpFile { @@ -61,18 +48,6 @@ pin_project_lite::pin_project! { } } -pub(crate) fn tmp_file(ext: Option<&str>) -> PathBuf { - if let Some(ext) = ext { - tmp_dir().join(format!("{}{}", Uuid::new_v4(), ext)) - } else { - tmp_dir().join(Uuid::new_v4().to_string()) - } -} - -pub(crate) async fn remove_tmp_dir() -> std::io::Result<()> { - tokio::fs::remove_dir_all(tmp_dir()).await -} - pub(crate) fn cleanup_tmpfile(reader: R, file: PathBuf) -> TmpFileCleanup { TmpFileCleanup { inner: reader, diff --git a/src/validate.rs b/src/validate.rs index 95b294d..70f2247 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -71,7 +71,7 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(timeout, bytes.clone()).await?; + } = crate::discover::discover_bytes(tmp_dir, timeout, bytes.clone()).await?; match &input { InputFile::Image(input) => { @@ -186,7 +186,8 @@ fn validate_animation( Ok(()) } -#[tracing::instrument(skip(bytes, validations))] +#[allow(clippy::too_many_arguments)] +#[tracing::instrument(skip(tmp_dir, bytes, validations))] async fn process_animation( tmp_dir: &TmpDir, bytes: Bytes, @@ -244,7 +245,8 @@ fn validate_video( Ok(()) } -#[tracing::instrument(skip(bytes, validations))] +#[allow(clippy::too_many_arguments)] +#[tracing::instrument(skip(tmp_dir, bytes, validations))] async fn process_video( tmp_dir: &TmpDir, bytes: Bytes,