Start working on per-server temporary directory

This commit is contained in:
asonix 2023-10-06 19:42:24 -05:00
parent 727eac408e
commit fff4afe105
8 changed files with 166 additions and 56 deletions

View File

@ -8,6 +8,7 @@ use crate::{
future::WithMetrics, future::WithMetrics,
repo::{Alias, ArcRepo, DeleteToken, Hash}, repo::{Alias, ArcRepo, DeleteToken, Hash},
store::Store, store::Store,
tmp_file::TmpDir,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
@ -47,6 +48,7 @@ where
#[tracing::instrument(skip(repo, store, client, stream, media))] #[tracing::instrument(skip(repo, store, client, stream, media))]
pub(crate) async fn ingest<S>( pub(crate) async fn ingest<S>(
tmp_dir: &TmpDir,
repo: &ArcRepo, repo: &ArcRepo,
store: &S, store: &S,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
@ -69,7 +71,7 @@ where
tracing::trace!("Validating bytes"); tracing::trace!("Validating bytes");
let (input_type, validated_reader) = let (input_type, validated_reader) =
crate::validate::validate_bytes(bytes, prescribed, media.process_timeout).await?; crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?;
let processed_reader = if let Some(operations) = media.preprocess_steps() { let processed_reader = if let Some(operations) = media.preprocess_steps() {
if let Some(format) = input_type.processable_format() { if let Some(format) = input_type.processable_format() {

View File

@ -55,6 +55,7 @@ use std::{
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use streem::IntoStreamer; use streem::IntoStreamer;
use tmp_file::{ArcTmpDir, TmpDir};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use tracing::Instrument; use tracing::Instrument;
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
@ -139,9 +140,10 @@ impl<S: Store + 'static> FormData for Upload<S> {
type Error = Error; type Error = Error;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> { fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
// Create a new Multipart Form validator let tmp_dir = req
// .app_data::<web::Data<ArcTmpDir>>()
// This form is expecting a single array field, 'images' with at most 10 files in it .expect("No TmpDir in request")
.clone();
let repo = req let repo = req
.app_data::<web::Data<ArcRepo>>() .app_data::<web::Data<ArcRepo>>()
.expect("No repo in request") .expect("No repo in request")
@ -159,6 +161,9 @@ impl<S: Store + 'static> FormData for Upload<S> {
.expect("No configuration in request") .expect("No configuration in request")
.clone(); .clone();
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Form::new() Form::new()
.max_files(config.server.max_file_count) .max_files(config.server.max_file_count)
.max_file_size(config.media.max_file_size * MEGABYTES) .max_file_size(config.media.max_file_size * MEGABYTES)
@ -166,6 +171,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
.field( .field(
"images", "images",
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let tmp_dir = tmp_dir.clone();
let repo = repo.clone(); let repo = repo.clone();
let store = store.clone(); let store = store.clone();
let client = client.clone(); let client = client.clone();
@ -183,7 +189,15 @@ impl<S: Store + 'static> FormData for Upload<S> {
let stream = crate::stream::from_err(stream); let stream = crate::stream::from_err(stream);
ingest::ingest(&repo, &**store, &client, stream, None, &config.media) ingest::ingest(
&tmp_dir,
&repo,
&**store,
&client,
stream,
None,
&config.media,
)
.await .await
} }
.instrument(span), .instrument(span),
@ -204,6 +218,10 @@ impl<S: Store + 'static> FormData for Import<S> {
type Error = Error; type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> { fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
let tmp_dir = req
.app_data::<web::Data<ArcTmpDir>>()
.expect("No TmpDir in request")
.clone();
let repo = req let repo = req
.app_data::<web::Data<ArcRepo>>() .app_data::<web::Data<ArcRepo>>()
.expect("No repo in request") .expect("No repo in request")
@ -231,6 +249,7 @@ impl<S: Store + 'static> FormData for Import<S> {
.field( .field(
"images", "images",
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let tmp_dir = tmp_dir.clone();
let repo = repo.clone(); let repo = repo.clone();
let store = store.clone(); let store = store.clone();
let client = client.clone(); let client = client.clone();
@ -249,6 +268,7 @@ impl<S: Store + 'static> FormData for Import<S> {
let stream = crate::stream::from_err(stream); let stream = crate::stream::from_err(stream);
ingest::ingest( ingest::ingest(
&tmp_dir,
&repo, &repo,
&**store, &**store,
&client, &client,
@ -499,12 +519,13 @@ struct UrlQuery {
async fn ingest_inline<S: Store + 'static>( async fn ingest_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static, stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static,
tmp_dir: &TmpDir,
repo: &ArcRepo, repo: &ArcRepo,
store: &S, store: &S,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
config: &Configuration, config: &Configuration,
) -> Result<(Alias, DeleteToken, Details), Error> { ) -> Result<(Alias, DeleteToken, Details), Error> {
let session = ingest::ingest(repo, store, client, stream, None, &config.media).await?; let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config.media).await?;
let alias = session.alias().expect("alias should exist").to_owned(); let alias = session.alias().expect("alias should exist").to_owned();
@ -519,6 +540,7 @@ async fn ingest_inline<S: Store + 'static>(
#[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))]
async fn download<S: Store + 'static>( async fn download<S: Store + 'static>(
client: web::Data<ClientWithMiddleware>, client: web::Data<ClientWithMiddleware>,
tmp_dir: web::Data<ArcTmpDir>,
repo: web::Data<ArcRepo>, repo: web::Data<ArcRepo>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>, config: web::Data<Configuration>,
@ -529,7 +551,7 @@ async fn download<S: Store + 'static>(
if query.backgrounded { if query.backgrounded {
do_download_backgrounded(stream, repo, store).await do_download_backgrounded(stream, repo, store).await
} else { } else {
do_download_inline(stream, repo, store, &client, config).await do_download_inline(stream, &tmp_dir, repo, store, &client, config).await
} }
} }
@ -562,6 +584,7 @@ async fn download_stream(
)] )]
async fn do_download_inline<S: Store + 'static>( async fn do_download_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static, stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static,
tmp_dir: &TmpDir,
repo: web::Data<ArcRepo>, repo: web::Data<ArcRepo>,
store: web::Data<S>, store: web::Data<S>,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
@ -570,7 +593,7 @@ async fn do_download_inline<S: Store + 'static>(
metrics::increment_counter!("pict-rs.files", "download" => "inline"); metrics::increment_counter!("pict-rs.files", "download" => "inline");
let (alias, delete_token, details) = let (alias, delete_token, details) =
ingest_inline(stream, &repo, &store, client, &config).await?; ingest_inline(stream, tmp_dir, &repo, &store, client, &config).await?;
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
@ -832,6 +855,7 @@ async fn process<S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>, web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>, ext: web::Path<String>,
tmp_dir: web::Data<ArcTmpDir>,
repo: web::Data<ArcRepo>, repo: web::Data<ArcRepo>,
store: web::Data<S>, store: web::Data<S>,
client: web::Data<ClientWithMiddleware>, client: web::Data<ClientWithMiddleware>,
@ -848,7 +872,8 @@ async fn process<S: Store + 'static>(
} else if !config.server.read_only { } else if !config.server.read_only {
let stream = download_stream(&client, proxy.as_str(), &config).await?; let stream = download_stream(&client, proxy.as_str(), &config).await?;
let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; let (alias, _, _) =
ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?;
repo.relate_url(proxy, alias.clone()).await?; repo.relate_url(proxy, alias.clone()).await?;
@ -1135,6 +1160,7 @@ async fn do_details<S: Store + 'static>(
async fn serve_query<S: Store + 'static>( async fn serve_query<S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>, web::Query(alias_query): web::Query<AliasQuery>,
tmp_dir: web::Data<ArcTmpDir>,
repo: web::Data<ArcRepo>, repo: web::Data<ArcRepo>,
store: web::Data<S>, store: web::Data<S>,
client: web::Data<ClientWithMiddleware>, client: web::Data<ClientWithMiddleware>,
@ -1148,7 +1174,8 @@ async fn serve_query<S: Store + 'static>(
} else if !config.server.read_only { } else if !config.server.read_only {
let stream = download_stream(&client, proxy.as_str(), &config).await?; let stream = download_stream(&client, proxy.as_str(), &config).await?;
let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; let (alias, _, _) =
ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?;
repo.relate_url(proxy, alias.clone()).await?; repo.relate_url(proxy, alias.clone()).await?;
@ -1735,6 +1762,7 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) {
} }
fn spawn_workers<S>( fn spawn_workers<S>(
tmp_dir: ArcTmpDir,
repo: ArcRepo, repo: ArcRepo,
store: S, store: S,
client: ClientWithMiddleware, client: ClientWithMiddleware,
@ -1749,6 +1777,7 @@ fn spawn_workers<S>(
config.clone(), config.clone(),
)); ));
crate::sync::spawn(queue::process_images( crate::sync::spawn(queue::process_images(
tmp_dir,
repo, repo,
store, store,
client, client,
@ -1758,6 +1787,7 @@ fn spawn_workers<S>(
} }
async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>( async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
tmp_dir: ArcTmpDir,
repo: ArcRepo, repo: ArcRepo,
store: FileStore, store: FileStore,
client: ClientWithMiddleware, client: ClientWithMiddleware,
@ -1771,6 +1801,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
spawn_cleanup(repo.clone(), &config); spawn_cleanup(repo.clone(), &config);
HttpServer::new(move || { HttpServer::new(move || {
let tmp_dir = tmp_dir.clone();
let client = client.clone(); let client = client.clone();
let store = store.clone(); let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
@ -1778,6 +1809,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
let extra_config = extra_config.clone(); let extra_config = extra_config.clone();
spawn_workers( spawn_workers(
tmp_dir.clone(),
repo.clone(), repo.clone(),
store.clone(), store.clone(),
client.clone(), client.clone(),
@ -1791,6 +1823,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
.wrap(Metrics) .wrap(Metrics)
.wrap(Payload::new()) .wrap(Payload::new())
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.app_data(web::Data::new(tmp_dir))
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
.bind(address)? .bind(address)?
@ -1799,6 +1832,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
} }
async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>( async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
tmp_dir: ArcTmpDir,
repo: ArcRepo, repo: ArcRepo,
store: ObjectStore, store: ObjectStore,
client: ClientWithMiddleware, client: ClientWithMiddleware,
@ -1812,6 +1846,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
spawn_cleanup(repo.clone(), &config); spawn_cleanup(repo.clone(), &config);
HttpServer::new(move || { HttpServer::new(move || {
let tmp_dir = tmp_dir.clone();
let client = client.clone(); let client = client.clone();
let store = store.clone(); let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
@ -1819,6 +1854,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
let extra_config = extra_config.clone(); let extra_config = extra_config.clone();
spawn_workers( spawn_workers(
tmp_dir.clone(),
repo.clone(), repo.clone(),
store.clone(), store.clone(),
client.clone(), client.clone(),
@ -1832,6 +1868,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
.wrap(Metrics) .wrap(Metrics)
.wrap(Payload::new()) .wrap(Payload::new())
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.app_data(web::Data::new(tmp_dir))
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
.bind(address)? .bind(address)?
@ -2075,6 +2112,8 @@ impl PictRsConfiguration {
tracing::warn!("Launching in READ ONLY mode"); tracing::warn!("Launching in READ ONLY mode");
} }
let tmp_dir = TmpDir::init().await?;
match config.store.clone() { match config.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => { config::Store::Filesystem(config::Filesystem { path }) => {
let arc_repo = repo.to_arc(); let arc_repo = repo.to_arc();
@ -2100,13 +2139,13 @@ impl PictRsConfiguration {
match repo { match repo {
Repo::Sled(sled_repo) => { Repo::Sled(sled_repo) => {
launch_file_store(arc_repo, store, client, config, move |sc| { launch_file_store(tmp_dir, arc_repo, store, client, config, move |sc| {
sled_extra_config(sc, sled_repo.clone()) sled_extra_config(sc, sled_repo.clone())
}) })
.await?; .await?;
} }
Repo::Postgres(_) => { Repo::Postgres(_) => {
launch_file_store(arc_repo, store, client, config, |_| {}).await?; launch_file_store(tmp_dir, arc_repo, store, client, config, |_| {}).await?;
} }
} }
} }
@ -2163,13 +2202,14 @@ impl PictRsConfiguration {
match repo { match repo {
Repo::Sled(sled_repo) => { Repo::Sled(sled_repo) => {
launch_object_store(arc_repo, store, client, config, move |sc| { launch_object_store(tmp_dir, arc_repo, store, client, config, move |sc| {
sled_extra_config(sc, sled_repo.clone()) sled_extra_config(sc, sled_repo.clone())
}) })
.await?; .await?;
} }
Repo::Postgres(_) => { Repo::Postgres(_) => {
launch_object_store(arc_repo, store, client, config, |_| {}).await?; launch_object_store(tmp_dir, arc_repo, store, client, config, |_| {})
.await?;
} }
} }
} }

View File

@ -4,9 +4,10 @@ use crate::{
error::{Error, UploadError}, error::{Error, UploadError},
formats::InputProcessableFormat, formats::InputProcessableFormat,
future::LocalBoxFuture, future::LocalBoxFuture,
repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId}, repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
serde_str::Serde, serde_str::Serde,
store::Store, store::Store,
tmp_file::ArcTmpDir,
}; };
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use std::{ use std::{
@ -60,7 +61,7 @@ enum Process {
} }
pub(crate) async fn cleanup_alias( pub(crate) async fn cleanup_alias(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
alias: Alias, alias: Alias,
token: DeleteToken, token: DeleteToken,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -73,16 +74,13 @@ pub(crate) async fn cleanup_alias(
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_hash(repo: &Arc<dyn FullRepo>, hash: Hash) -> Result<(), Error> { pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?; repo.push(CLEANUP_QUEUE, job).await?;
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_identifier( pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) -> Result<(), Error> {
repo: &Arc<dyn FullRepo>,
identifier: &Arc<str>,
) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::Identifier { let job = serde_json::to_value(Cleanup::Identifier {
identifier: identifier.to_string(), identifier: identifier.to_string(),
}) })
@ -92,7 +90,7 @@ pub(crate) async fn cleanup_identifier(
} }
async fn cleanup_variants( async fn cleanup_variants(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
hash: Hash, hash: Hash,
variant: Option<String>, variant: Option<String>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -102,26 +100,26 @@ async fn cleanup_variants(
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_outdated_proxies(repo: &Arc<dyn FullRepo>) -> Result<(), Error> { pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?; repo.push(CLEANUP_QUEUE, job).await?;
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_outdated_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> { pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?; repo.push(CLEANUP_QUEUE, job).await?;
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_all_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> { pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?; let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?; repo.push(CLEANUP_QUEUE, job).await?;
Ok(()) Ok(())
} }
pub(crate) async fn queue_ingest( pub(crate) async fn queue_ingest(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
identifier: &Arc<str>, identifier: &Arc<str>,
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
@ -137,7 +135,7 @@ pub(crate) async fn queue_ingest(
} }
pub(crate) async fn queue_generate( pub(crate) async fn queue_generate(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
target_format: InputProcessableFormat, target_format: InputProcessableFormat,
source: Alias, source: Alias,
process_path: PathBuf, process_path: PathBuf,
@ -154,22 +152,20 @@ pub(crate) async fn queue_generate(
Ok(()) Ok(())
} }
pub(crate) async fn process_cleanup<S: Store>( pub(crate) async fn process_cleanup<S: Store>(repo: ArcRepo, store: S, config: Configuration) {
repo: Arc<dyn FullRepo>,
store: S,
config: Configuration,
) {
process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await
} }
pub(crate) async fn process_images<S: Store + 'static>( pub(crate) async fn process_images<S: Store + 'static>(
repo: Arc<dyn FullRepo>, tmp_dir: ArcTmpDir,
repo: ArcRepo,
store: S, store: S,
client: ClientWithMiddleware, client: ClientWithMiddleware,
process_map: ProcessMap, process_map: ProcessMap,
config: Configuration, config: Configuration,
) { ) {
process_image_jobs( process_image_jobs(
&tmp_dir,
&repo, &repo,
&store, &store,
&client, &client,
@ -182,7 +178,7 @@ pub(crate) async fn process_images<S: Store + 'static>(
} }
async fn process_jobs<S, F>( async fn process_jobs<S, F>(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
store: &S, store: &S,
config: &Configuration, config: &Configuration,
queue: &'static str, queue: &'static str,
@ -190,7 +186,7 @@ async fn process_jobs<S, F>(
) where ) where
S: Store, S: Store,
for<'a> F: Fn( for<'a> F: Fn(
&'a Arc<dyn FullRepo>, &'a ArcRepo,
&'a S, &'a S,
&'a Configuration, &'a Configuration,
serde_json::Value, serde_json::Value,
@ -249,7 +245,7 @@ impl Drop for MetricsGuard {
} }
async fn job_loop<S, F>( async fn job_loop<S, F>(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
store: &S, store: &S,
config: &Configuration, config: &Configuration,
worker_id: uuid::Uuid, worker_id: uuid::Uuid,
@ -259,7 +255,7 @@ async fn job_loop<S, F>(
where where
S: Store, S: Store,
for<'a> F: Fn( for<'a> F: Fn(
&'a Arc<dyn FullRepo>, &'a ArcRepo,
&'a S, &'a S,
&'a Configuration, &'a Configuration,
serde_json::Value, serde_json::Value,
@ -302,7 +298,8 @@ where
} }
async fn process_image_jobs<S, F>( async fn process_image_jobs<S, F>(
repo: &Arc<dyn FullRepo>, tmp_dir: &ArcTmpDir,
repo: &ArcRepo,
store: &S, store: &S,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
process_map: &ProcessMap, process_map: &ProcessMap,
@ -312,7 +309,8 @@ async fn process_image_jobs<S, F>(
) where ) where
S: Store, S: Store,
for<'a> F: Fn( for<'a> F: Fn(
&'a Arc<dyn FullRepo>, &'a ArcTmpDir,
&'a ArcRepo,
&'a S, &'a S,
&'a ClientWithMiddleware, &'a ClientWithMiddleware,
&'a ProcessMap, &'a ProcessMap,
@ -325,6 +323,7 @@ async fn process_image_jobs<S, F>(
loop { loop {
let res = image_job_loop( let res = image_job_loop(
tmp_dir,
repo, repo,
store, store,
client, client,
@ -353,7 +352,8 @@ async fn process_image_jobs<S, F>(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn image_job_loop<S, F>( async fn image_job_loop<S, F>(
repo: &Arc<dyn FullRepo>, tmp_dir: &ArcTmpDir,
repo: &ArcRepo,
store: &S, store: &S,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
process_map: &ProcessMap, process_map: &ProcessMap,
@ -365,7 +365,8 @@ async fn image_job_loop<S, F>(
where where
S: Store, S: Store,
for<'a> F: Fn( for<'a> F: Fn(
&'a Arc<dyn FullRepo>, &'a ArcTmpDir,
&'a ArcRepo,
&'a S, &'a S,
&'a ClientWithMiddleware, &'a ClientWithMiddleware,
&'a ProcessMap, &'a ProcessMap,
@ -389,7 +390,7 @@ where
queue, queue,
worker_id, worker_id,
job_id, job_id,
(callback)(repo, store, client, process_map, config, job), (callback)(tmp_dir, repo, store, client, process_map, config, job),
) )
}) })
.instrument(span) .instrument(span)
@ -409,7 +410,7 @@ where
} }
async fn heartbeat<Fut>( async fn heartbeat<Fut>(
repo: &Arc<dyn FullRepo>, repo: &ArcRepo,
queue: &'static str, queue: &'static str,
worker_id: uuid::Uuid, worker_id: uuid::Uuid,
job_id: JobId, job_id: JobId,

View File

@ -12,10 +12,12 @@ use crate::{
repo::{Alias, ArcRepo, UploadId, UploadResult}, repo::{Alias, ArcRepo, UploadId, UploadResult},
serde_str::Serde, serde_str::Serde,
store::Store, store::Store,
tmp_file::ArcTmpDir,
}; };
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
pub(super) fn perform<'a, S>( pub(super) fn perform<'a, S>(
tmp_dir: &'a ArcTmpDir,
repo: &'a ArcRepo, repo: &'a ArcRepo,
store: &'a S, store: &'a S,
client: &'a ClientWithMiddleware, client: &'a ClientWithMiddleware,
@ -35,6 +37,7 @@ where
declared_alias, declared_alias,
} => { } => {
process_ingest( process_ingest(
tmp_dir,
repo, repo,
store, store,
client, client,
@ -109,6 +112,7 @@ impl Drop for UploadGuard {
#[tracing::instrument(skip(repo, store, client, media))] #[tracing::instrument(skip(repo, store, client, media))]
async fn process_ingest<S>( async fn process_ingest<S>(
tmp_dir: &ArcTmpDir,
repo: &ArcRepo, repo: &ArcRepo,
store: &S, store: &S,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
@ -123,6 +127,7 @@ where
let guard = UploadGuard::guard(upload_id); let guard = UploadGuard::guard(upload_id);
let fut = async { let fut = async {
let tmp_dir = tmp_dir.clone();
let ident = unprocessed_identifier.clone(); let ident = unprocessed_identifier.clone();
let store2 = store.clone(); let store2 = store.clone();
let repo = repo.clone(); let repo = repo.clone();
@ -132,8 +137,15 @@ where
let error_boundary = crate::sync::spawn(async move { let error_boundary = crate::sync::spawn(async move {
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?);
let session = let session = crate::ingest::ingest(
crate::ingest::ingest(&repo, &store2, &client, stream, declared_alias, &media) &tmp_dir,
&repo,
&store2,
&client,
stream,
declared_alias,
&media,
)
.await?; .await?;
Ok(session) as Result<Session, Error> Ok(session) as Result<Session, Error>

View File

@ -1,7 +1,39 @@
use std::{path::PathBuf, sync::OnceLock}; use std::{
path::PathBuf,
sync::{Arc, OnceLock},
};
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use uuid::Uuid; use uuid::Uuid;
pub(crate) type ArcTmpDir = Arc<TmpDir>;
#[derive(Debug)]
pub(crate) struct TmpDir {
path: PathBuf,
}
impl TmpDir {
pub(crate) async fn init() -> std::io::Result<Arc<Self>> {
let path = std::env::temp_dir().join(Uuid::new_v4().to_string());
tokio::fs::create_dir(&path).await?;
Ok(Arc::new(TmpDir { path }))
}
pub(crate) fn tmp_file(&self, ext: Option<&str>) -> PathBuf {
if let Some(ext) = ext {
self.path.join(format!("{}{}", Uuid::new_v4(), ext))
} else {
self.path.join(Uuid::new_v4().to_string())
}
}
}
impl Drop for TmpDir {
fn drop(&mut self) {
std::fs::remove_dir_all(&self.path).expect("Removed directory");
}
}
static TMP_DIR: OnceLock<PathBuf> = OnceLock::new(); static TMP_DIR: OnceLock<PathBuf> = OnceLock::new();
fn tmp_dir() -> &'static PathBuf { fn tmp_dir() -> &'static PathBuf {

View File

@ -11,6 +11,7 @@ use crate::{
InternalFormat, Validations, InternalFormat, Validations,
}, },
read::BoxRead, read::BoxRead,
tmp_file::TmpDir,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
@ -56,6 +57,7 @@ const MEGABYTES: usize = 1024 * 1024;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn validate_bytes( pub(crate) async fn validate_bytes(
tmp_dir: &TmpDir,
bytes: Bytes, bytes: Bytes,
validations: Validations<'_>, validations: Validations<'_>,
timeout: u64, timeout: u64,
@ -73,13 +75,22 @@ pub(crate) async fn validate_bytes(
match &input { match &input {
InputFile::Image(input) => { InputFile::Image(input) => {
let (format, read) = let (format, read) = process_image(
process_image(bytes, *input, width, height, validations.image, timeout).await?; tmp_dir,
bytes,
*input,
width,
height,
validations.image,
timeout,
)
.await?;
Ok((format, read)) Ok((format, read))
} }
InputFile::Animation(input) => { InputFile::Animation(input) => {
let (format, read) = process_animation( let (format, read) = process_animation(
tmp_dir,
bytes, bytes,
*input, *input,
width, width,
@ -94,6 +105,7 @@ pub(crate) async fn validate_bytes(
} }
InputFile::Video(input) => { InputFile::Video(input) => {
let (format, read) = process_video( let (format, read) = process_video(
tmp_dir,
bytes, bytes,
*input, *input,
width, width,
@ -111,6 +123,7 @@ pub(crate) async fn validate_bytes(
#[tracing::instrument(skip(bytes, validations))] #[tracing::instrument(skip(bytes, validations))]
async fn process_image( async fn process_image(
tmp_dir: &TmpDir,
bytes: Bytes, bytes: Bytes,
input: ImageInput, input: ImageInput,
width: u16, width: u16,
@ -139,7 +152,7 @@ async fn process_image(
let read = if needs_transcode { let read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
magick::convert_image(input.format, format, quality, timeout, bytes).await? magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await?
} else { } else {
exiftool::clear_metadata_bytes_read(bytes, timeout)? exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
@ -175,6 +188,7 @@ fn validate_animation(
#[tracing::instrument(skip(bytes, validations))] #[tracing::instrument(skip(bytes, validations))]
async fn process_animation( async fn process_animation(
tmp_dir: &TmpDir,
bytes: Bytes, bytes: Bytes,
input: AnimationFormat, input: AnimationFormat,
width: u16, width: u16,
@ -193,7 +207,7 @@ async fn process_animation(
let read = if needs_transcode { let read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
magick::convert_animation(input, format, quality, timeout, bytes).await? magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await?
} else { } else {
exiftool::clear_metadata_bytes_read(bytes, timeout)? exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
@ -232,6 +246,7 @@ fn validate_video(
#[tracing::instrument(skip(bytes, validations))] #[tracing::instrument(skip(bytes, validations))]
async fn process_video( async fn process_video(
tmp_dir: &TmpDir,
bytes: Bytes, bytes: Bytes,
input: InputVideoFormat, input: InputVideoFormat,
width: u16, width: u16,
@ -250,7 +265,7 @@ async fn process_video(
let crf = validations.crf_for(width, height); let crf = validations.crf_for(width, height);
let read = ffmpeg::transcode_bytes(input, output, crf, timeout, bytes).await?; let read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?;
Ok((InternalFormat::Video(output.format.internal_format()), read)) Ok((InternalFormat::Video(output.format.internal_format()), read))
} }

View File

@ -5,16 +5,18 @@ use crate::{
formats::{InputVideoFormat, OutputVideo}, formats::{InputVideoFormat, OutputVideo},
process::Process, process::Process,
read::BoxRead, read::BoxRead,
tmp_file::TmpDir,
}; };
pub(super) async fn transcode_bytes( pub(super) async fn transcode_bytes(
tmp_dir: &TmpDir,
input_format: InputVideoFormat, input_format: InputVideoFormat,
output_format: OutputVideo, output_format: OutputVideo,
crf: u8, crf: u8,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, FfMpegError> { ) -> Result<BoxRead<'static>, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(None); let input_file = tmp_dir.tmp_file(None);
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&input_file) crate::store::file_store::safe_create_parent(&input_file)
.await .await
@ -29,7 +31,7 @@ pub(super) async fn transcode_bytes(
.map_err(FfMpegError::Write)?; .map_err(FfMpegError::Write)?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let output_file = crate::tmp_file::tmp_file(None); let output_file = tmp_dir.tmp_file(None);
let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?; let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?;
transcode_files( transcode_files(

View File

@ -5,9 +5,11 @@ use crate::{
magick::MagickError, magick::MagickError,
process::Process, process::Process,
read::BoxRead, read::BoxRead,
tmp_file::TmpDir,
}; };
pub(super) async fn convert_image( pub(super) async fn convert_image(
tmp_dir: &TmpDir,
input: ImageFormat, input: ImageFormat,
output: ImageFormat, output: ImageFormat,
quality: Option<u8>, quality: Option<u8>,
@ -15,6 +17,7 @@ pub(super) async fn convert_image(
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
convert( convert(
tmp_dir,
input.magick_format(), input.magick_format(),
output.magick_format(), output.magick_format(),
false, false,
@ -26,6 +29,7 @@ pub(super) async fn convert_image(
} }
pub(super) async fn convert_animation( pub(super) async fn convert_animation(
tmp_dir: &TmpDir,
input: AnimationFormat, input: AnimationFormat,
output: AnimationFormat, output: AnimationFormat,
quality: Option<u8>, quality: Option<u8>,
@ -33,6 +37,7 @@ pub(super) async fn convert_animation(
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
convert( convert(
tmp_dir,
input.magick_format(), input.magick_format(),
output.magick_format(), output.magick_format(),
true, true,
@ -44,6 +49,7 @@ pub(super) async fn convert_animation(
} }
async fn convert( async fn convert(
tmp_dir: &TmpDir,
input: &'static str, input: &'static str,
output: &'static str, output: &'static str,
coalesce: bool, coalesce: bool,
@ -51,7 +57,7 @@ async fn convert(
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<BoxRead<'static>, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
let input_file = crate::tmp_file::tmp_file(None); let input_file = tmp_dir.tmp_file(None);
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
crate::store::file_store::safe_create_parent(&input_file) crate::store::file_store::safe_create_parent(&input_file)