diff --git a/src/ingest.rs b/src/ingest.rs index 58e95ee..1a0639c 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -8,6 +8,7 @@ use crate::{ future::WithMetrics, repo::{Alias, ArcRepo, DeleteToken, Hash}, store::Store, + tmp_file::TmpDir, }; use actix_web::web::Bytes; use futures_core::Stream; @@ -47,6 +48,7 @@ where #[tracing::instrument(skip(repo, store, client, stream, media))] pub(crate) async fn ingest( + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, client: &ClientWithMiddleware, @@ -69,7 +71,7 @@ where tracing::trace!("Validating bytes"); let (input_type, validated_reader) = - crate::validate::validate_bytes(bytes, prescribed, media.process_timeout).await?; + crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?; let processed_reader = if let Some(operations) = media.preprocess_steps() { if let Some(format) = input_type.processable_format() { diff --git a/src/lib.rs b/src/lib.rs index 19f874a..4556fa6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,7 @@ use std::{ time::{Duration, SystemTime}, }; use streem::IntoStreamer; +use tmp_file::{ArcTmpDir, TmpDir}; use tokio::sync::Semaphore; use tracing::Instrument; use tracing_actix_web::TracingLogger; @@ -139,9 +140,10 @@ impl FormData for Upload { type Error = Error; fn form(req: &HttpRequest) -> Form { - // Create a new Multipart Form validator - // - // This form is expecting a single array field, 'images' with at most 10 files in it + let tmp_dir = req + .app_data::>() + .expect("No TmpDir in request") + .clone(); let repo = req .app_data::>() .expect("No repo in request") @@ -159,6 +161,9 @@ impl FormData for Upload { .expect("No configuration in request") .clone(); + // Create a new Multipart Form validator + // + // This form is expecting a single array field, 'images' with at most 10 files in it Form::new() .max_files(config.server.max_file_count) .max_file_size(config.media.max_file_size * MEGABYTES) @@ -166,6 +171,7 @@ impl FormData for Upload { .field( "images", Field::array(Field::file(move |filename, _, stream| { + let tmp_dir = tmp_dir.clone(); let repo = repo.clone(); let store = store.clone(); let client = client.clone(); @@ -183,8 +189,16 @@ impl FormData for Upload { let stream = crate::stream::from_err(stream); - ingest::ingest(&repo, &**store, &client, stream, None, &config.media) - .await + ingest::ingest( + &tmp_dir, + &repo, + &**store, + &client, + stream, + None, + &config.media, + ) + .await } .instrument(span), ) @@ -204,6 +218,10 @@ impl FormData for Import { type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { + let tmp_dir = req + .app_data::>() + .expect("No TmpDir in request") + .clone(); let repo = req .app_data::>() .expect("No repo in request") @@ -231,6 +249,7 @@ impl FormData for Import { .field( "images", Field::array(Field::file(move |filename, _, stream| { + let tmp_dir = tmp_dir.clone(); let repo = repo.clone(); let store = store.clone(); let client = client.clone(); @@ -249,6 +268,7 @@ impl FormData for Import { let stream = crate::stream::from_err(stream); ingest::ingest( + &tmp_dir, &repo, &**store, &client, @@ -499,12 +519,13 @@ struct UrlQuery { async fn ingest_inline( stream: impl Stream> + 'static, + tmp_dir: &TmpDir, repo: &ArcRepo, store: &S, client: &ClientWithMiddleware, config: &Configuration, ) -> Result<(Alias, DeleteToken, Details), Error> { - let session = ingest::ingest(repo, store, client, stream, None, &config.media).await?; + let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config.media).await?; let alias = session.alias().expect("alias should exist").to_owned(); @@ -519,6 +540,7 @@ async fn ingest_inline( #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] async fn download( client: web::Data, + tmp_dir: web::Data, repo: web::Data, store: web::Data, config: web::Data, @@ -529,7 +551,7 @@ async fn download( if query.backgrounded { do_download_backgrounded(stream, repo, store).await } else { - do_download_inline(stream, repo, store, &client, config).await + do_download_inline(stream, &tmp_dir, repo, store, &client, config).await } } @@ -562,6 +584,7 @@ async fn download_stream( )] async fn do_download_inline( stream: impl Stream> + 'static, + tmp_dir: &TmpDir, repo: web::Data, store: web::Data, client: &ClientWithMiddleware, @@ -570,7 +593,7 @@ async fn do_download_inline( metrics::increment_counter!("pict-rs.files", "download" => "inline"); let (alias, delete_token, details) = - ingest_inline(stream, &repo, &store, client, &config).await?; + ingest_inline(stream, tmp_dir, &repo, &store, client, &config).await?; Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -832,6 +855,7 @@ async fn process( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, + tmp_dir: web::Data, repo: web::Data, store: web::Data, client: web::Data, @@ -848,7 +872,8 @@ async fn process( } else if !config.server.read_only { let stream = download_stream(&client, proxy.as_str(), &config).await?; - let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; + let (alias, _, _) = + ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?; repo.relate_url(proxy, alias.clone()).await?; @@ -1135,6 +1160,7 @@ async fn do_details( async fn serve_query( range: Option>, web::Query(alias_query): web::Query, + tmp_dir: web::Data, repo: web::Data, store: web::Data, client: web::Data, @@ -1148,7 +1174,8 @@ async fn serve_query( } else if !config.server.read_only { let stream = download_stream(&client, proxy.as_str(), &config).await?; - let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; + let (alias, _, _) = + ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?; repo.relate_url(proxy, alias.clone()).await?; @@ -1735,6 +1762,7 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { } fn spawn_workers( + tmp_dir: ArcTmpDir, repo: ArcRepo, store: S, client: ClientWithMiddleware, @@ -1749,6 +1777,7 @@ fn spawn_workers( config.clone(), )); crate::sync::spawn(queue::process_images( + tmp_dir, repo, store, client, @@ -1758,6 +1787,7 @@ fn spawn_workers( } async fn launch_file_store( + tmp_dir: ArcTmpDir, repo: ArcRepo, store: FileStore, client: ClientWithMiddleware, @@ -1771,6 +1801,7 @@ async fn launch_file_store( + tmp_dir: ArcTmpDir, repo: ArcRepo, store: ObjectStore, client: ClientWithMiddleware, @@ -1812,6 +1846,7 @@ async fn launch_object_store { let arc_repo = repo.to_arc(); @@ -2100,13 +2139,13 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_file_store(arc_repo, store, client, config, move |sc| { + launch_file_store(tmp_dir, arc_repo, store, client, config, move |sc| { sled_extra_config(sc, sled_repo.clone()) }) .await?; } Repo::Postgres(_) => { - launch_file_store(arc_repo, store, client, config, |_| {}).await?; + launch_file_store(tmp_dir, arc_repo, store, client, config, |_| {}).await?; } } } @@ -2163,13 +2202,14 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_object_store(arc_repo, store, client, config, move |sc| { + launch_object_store(tmp_dir, arc_repo, store, client, config, move |sc| { sled_extra_config(sc, sled_repo.clone()) }) .await?; } Repo::Postgres(_) => { - launch_object_store(arc_repo, store, client, config, |_| {}).await?; + launch_object_store(tmp_dir, arc_repo, store, client, config, |_| {}) + .await?; } } } diff --git a/src/queue.rs b/src/queue.rs index d7f4447..06e8eb3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -4,9 +4,10 @@ use crate::{ error::{Error, UploadError}, formats::InputProcessableFormat, future::LocalBoxFuture, - repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId}, + repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId}, serde_str::Serde, store::Store, + tmp_file::ArcTmpDir, }; use reqwest_middleware::ClientWithMiddleware; use std::{ @@ -60,7 +61,7 @@ enum Process { } pub(crate) async fn cleanup_alias( - repo: &Arc, + repo: &ArcRepo, alias: Alias, token: DeleteToken, ) -> Result<(), Error> { @@ -73,16 +74,13 @@ pub(crate) async fn cleanup_alias( Ok(()) } -pub(crate) async fn cleanup_hash(repo: &Arc, hash: Hash) -> Result<(), Error> { +pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } -pub(crate) async fn cleanup_identifier( - repo: &Arc, - identifier: &Arc, -) -> Result<(), Error> { +pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::Identifier { identifier: identifier.to_string(), }) @@ -92,7 +90,7 @@ pub(crate) async fn cleanup_identifier( } async fn cleanup_variants( - repo: &Arc, + repo: &ArcRepo, hash: Hash, variant: Option, ) -> Result<(), Error> { @@ -102,26 +100,26 @@ async fn cleanup_variants( Ok(()) } -pub(crate) async fn cleanup_outdated_proxies(repo: &Arc) -> Result<(), Error> { +pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } -pub(crate) async fn cleanup_outdated_variants(repo: &Arc) -> Result<(), Error> { +pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } -pub(crate) async fn cleanup_all_variants(repo: &Arc) -> Result<(), Error> { +pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn queue_ingest( - repo: &Arc, + repo: &ArcRepo, identifier: &Arc, upload_id: UploadId, declared_alias: Option, @@ -137,7 +135,7 @@ pub(crate) async fn queue_ingest( } pub(crate) async fn queue_generate( - repo: &Arc, + repo: &ArcRepo, target_format: InputProcessableFormat, source: Alias, process_path: PathBuf, @@ -154,22 +152,20 @@ pub(crate) async fn queue_generate( Ok(()) } -pub(crate) async fn process_cleanup( - repo: Arc, - store: S, - config: Configuration, -) { +pub(crate) async fn process_cleanup(repo: ArcRepo, store: S, config: Configuration) { process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await } pub(crate) async fn process_images( - repo: Arc, + tmp_dir: ArcTmpDir, + repo: ArcRepo, store: S, client: ClientWithMiddleware, process_map: ProcessMap, config: Configuration, ) { process_image_jobs( + &tmp_dir, &repo, &store, &client, @@ -182,7 +178,7 @@ pub(crate) async fn process_images( } async fn process_jobs( - repo: &Arc, + repo: &ArcRepo, store: &S, config: &Configuration, queue: &'static str, @@ -190,7 +186,7 @@ async fn process_jobs( ) where S: Store, for<'a> F: Fn( - &'a Arc, + &'a ArcRepo, &'a S, &'a Configuration, serde_json::Value, @@ -249,7 +245,7 @@ impl Drop for MetricsGuard { } async fn job_loop( - repo: &Arc, + repo: &ArcRepo, store: &S, config: &Configuration, worker_id: uuid::Uuid, @@ -259,7 +255,7 @@ async fn job_loop( where S: Store, for<'a> F: Fn( - &'a Arc, + &'a ArcRepo, &'a S, &'a Configuration, serde_json::Value, @@ -302,7 +298,8 @@ where } async fn process_image_jobs( - repo: &Arc, + tmp_dir: &ArcTmpDir, + repo: &ArcRepo, store: &S, client: &ClientWithMiddleware, process_map: &ProcessMap, @@ -312,7 +309,8 @@ async fn process_image_jobs( ) where S: Store, for<'a> F: Fn( - &'a Arc, + &'a ArcTmpDir, + &'a ArcRepo, &'a S, &'a ClientWithMiddleware, &'a ProcessMap, @@ -325,6 +323,7 @@ async fn process_image_jobs( loop { let res = image_job_loop( + tmp_dir, repo, store, client, @@ -353,7 +352,8 @@ async fn process_image_jobs( #[allow(clippy::too_many_arguments)] async fn image_job_loop( - repo: &Arc, + tmp_dir: &ArcTmpDir, + repo: &ArcRepo, store: &S, client: &ClientWithMiddleware, process_map: &ProcessMap, @@ -365,7 +365,8 @@ async fn image_job_loop( where S: Store, for<'a> F: Fn( - &'a Arc, + &'a ArcTmpDir, + &'a ArcRepo, &'a S, &'a ClientWithMiddleware, &'a ProcessMap, @@ -389,7 +390,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, client, process_map, config, job), + (callback)(tmp_dir, repo, store, client, process_map, config, job), ) }) .instrument(span) @@ -409,7 +410,7 @@ where } async fn heartbeat( - repo: &Arc, + repo: &ArcRepo, queue: &'static str, worker_id: uuid::Uuid, job_id: JobId, diff --git a/src/queue/process.rs b/src/queue/process.rs index bf8204b..1f2e7d0 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -12,10 +12,12 @@ use crate::{ repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, store::Store, + tmp_file::ArcTmpDir, }; use std::{path::PathBuf, sync::Arc}; pub(super) fn perform<'a, S>( + tmp_dir: &'a ArcTmpDir, repo: &'a ArcRepo, store: &'a S, client: &'a ClientWithMiddleware, @@ -35,6 +37,7 @@ where declared_alias, } => { process_ingest( + tmp_dir, repo, store, client, @@ -109,6 +112,7 @@ impl Drop for UploadGuard { #[tracing::instrument(skip(repo, store, client, media))] async fn process_ingest( + tmp_dir: &ArcTmpDir, repo: &ArcRepo, store: &S, client: &ClientWithMiddleware, @@ -123,6 +127,7 @@ where let guard = UploadGuard::guard(upload_id); let fut = async { + let tmp_dir = tmp_dir.clone(); let ident = unprocessed_identifier.clone(); let store2 = store.clone(); let repo = repo.clone(); @@ -132,9 +137,16 @@ where let error_boundary = crate::sync::spawn(async move { let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); - let session = - crate::ingest::ingest(&repo, &store2, &client, stream, declared_alias, &media) - .await?; + let session = crate::ingest::ingest( + &tmp_dir, + &repo, + &store2, + &client, + stream, + declared_alias, + &media, + ) + .await?; Ok(session) as Result }) diff --git a/src/tmp_file.rs b/src/tmp_file.rs index f9e3d3b..ae4253c 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -1,7 +1,39 @@ -use std::{path::PathBuf, sync::OnceLock}; +use std::{ + path::PathBuf, + sync::{Arc, OnceLock}, +}; use tokio::io::AsyncRead; use uuid::Uuid; +pub(crate) type ArcTmpDir = Arc; + +#[derive(Debug)] +pub(crate) struct TmpDir { + path: PathBuf, +} + +impl TmpDir { + pub(crate) async fn init() -> std::io::Result> { + let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + tokio::fs::create_dir(&path).await?; + Ok(Arc::new(TmpDir { path })) + } + + pub(crate) fn tmp_file(&self, ext: Option<&str>) -> PathBuf { + if let Some(ext) = ext { + self.path.join(format!("{}{}", Uuid::new_v4(), ext)) + } else { + self.path.join(Uuid::new_v4().to_string()) + } + } +} + +impl Drop for TmpDir { + fn drop(&mut self) { + std::fs::remove_dir_all(&self.path).expect("Removed directory"); + } +} + static TMP_DIR: OnceLock = OnceLock::new(); fn tmp_dir() -> &'static PathBuf { diff --git a/src/validate.rs b/src/validate.rs index fdf6d13..95b294d 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -11,6 +11,7 @@ use crate::{ InternalFormat, Validations, }, read::BoxRead, + tmp_file::TmpDir, }; use actix_web::web::Bytes; @@ -56,6 +57,7 @@ const MEGABYTES: usize = 1024 * 1024; #[tracing::instrument(skip_all)] pub(crate) async fn validate_bytes( + tmp_dir: &TmpDir, bytes: Bytes, validations: Validations<'_>, timeout: u64, @@ -73,13 +75,22 @@ pub(crate) async fn validate_bytes( match &input { InputFile::Image(input) => { - let (format, read) = - process_image(bytes, *input, width, height, validations.image, timeout).await?; + let (format, read) = process_image( + tmp_dir, + bytes, + *input, + width, + height, + validations.image, + timeout, + ) + .await?; Ok((format, read)) } InputFile::Animation(input) => { let (format, read) = process_animation( + tmp_dir, bytes, *input, width, @@ -94,6 +105,7 @@ pub(crate) async fn validate_bytes( } InputFile::Video(input) => { let (format, read) = process_video( + tmp_dir, bytes, *input, width, @@ -111,6 +123,7 @@ pub(crate) async fn validate_bytes( #[tracing::instrument(skip(bytes, validations))] async fn process_image( + tmp_dir: &TmpDir, bytes: Bytes, input: ImageInput, width: u16, @@ -139,7 +152,7 @@ async fn process_image( let read = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_image(input.format, format, quality, timeout, bytes).await? + magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await? } else { exiftool::clear_metadata_bytes_read(bytes, timeout)? }; @@ -175,6 +188,7 @@ fn validate_animation( #[tracing::instrument(skip(bytes, validations))] async fn process_animation( + tmp_dir: &TmpDir, bytes: Bytes, input: AnimationFormat, width: u16, @@ -193,7 +207,7 @@ async fn process_animation( let read = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_animation(input, format, quality, timeout, bytes).await? + magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await? } else { exiftool::clear_metadata_bytes_read(bytes, timeout)? }; @@ -232,6 +246,7 @@ fn validate_video( #[tracing::instrument(skip(bytes, validations))] async fn process_video( + tmp_dir: &TmpDir, bytes: Bytes, input: InputVideoFormat, width: u16, @@ -250,7 +265,7 @@ async fn process_video( let crf = validations.crf_for(width, height); - let read = ffmpeg::transcode_bytes(input, output, crf, timeout, bytes).await?; + let read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?; Ok((InternalFormat::Video(output.format.internal_format()), read)) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 3b1c8ae..da1bd9a 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -5,16 +5,18 @@ use crate::{ formats::{InputVideoFormat, OutputVideo}, process::Process, read::BoxRead, + tmp_file::TmpDir, }; pub(super) async fn transcode_bytes( + tmp_dir: &TmpDir, input_format: InputVideoFormat, output_format: OutputVideo, crf: u8, timeout: u64, bytes: Bytes, ) -> Result, FfMpegError> { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; crate::store::file_store::safe_create_parent(&input_file) .await @@ -29,7 +31,7 @@ pub(super) async fn transcode_bytes( .map_err(FfMpegError::Write)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?; - let output_file = crate::tmp_file::tmp_file(None); + let output_file = tmp_dir.tmp_file(None); let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?; transcode_files( diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 530fa96..b087674 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -5,9 +5,11 @@ use crate::{ magick::MagickError, process::Process, read::BoxRead, + tmp_file::TmpDir, }; pub(super) async fn convert_image( + tmp_dir: &TmpDir, input: ImageFormat, output: ImageFormat, quality: Option, @@ -15,6 +17,7 @@ pub(super) async fn convert_image( bytes: Bytes, ) -> Result, MagickError> { convert( + tmp_dir, input.magick_format(), output.magick_format(), false, @@ -26,6 +29,7 @@ pub(super) async fn convert_image( } pub(super) async fn convert_animation( + tmp_dir: &TmpDir, input: AnimationFormat, output: AnimationFormat, quality: Option, @@ -33,6 +37,7 @@ pub(super) async fn convert_animation( bytes: Bytes, ) -> Result, MagickError> { convert( + tmp_dir, input.magick_format(), output.magick_format(), true, @@ -44,6 +49,7 @@ pub(super) async fn convert_animation( } async fn convert( + tmp_dir: &TmpDir, input: &'static str, output: &'static str, coalesce: bool, @@ -51,7 +57,7 @@ async fn convert( timeout: u64, bytes: Bytes, ) -> Result, MagickError> { - let input_file = crate::tmp_file::tmp_file(None); + let input_file = tmp_dir.tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; crate::store::file_store::safe_create_parent(&input_file)