diff --git a/src/backgrounded.rs b/src/backgrounded.rs index eb95556..1585afc 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::{ error::Error, repo::{ArcRepo, UploadId}, + state::State, store::Store, }; use actix_web::web::Bytes; @@ -30,23 +31,23 @@ impl Backgrounded { self.identifier.as_ref() } - pub(crate) async fn proxy(repo: ArcRepo, store: S, stream: P) -> Result + pub(crate) async fn proxy(state: &State, stream: P) -> Result where S: Store, P: Stream> + 'static, { let mut this = Self { - repo, + repo: state.repo.clone(), identifier: None, upload_id: None, }; - this.do_proxy(store, stream).await?; + this.do_proxy(&state.store, stream).await?; Ok(this) } - async fn do_proxy(&mut self, store: S, stream: P) -> Result<(), Error> + async fn do_proxy(&mut self, store: &S, stream: P) -> Result<(), Error> where S: Store, P: Stream> + 'static, diff --git a/src/details.rs b/src/details.rs index 5c0647e..c2fd9fc 100644 --- a/src/details.rs +++ b/src/details.rs @@ -4,6 +4,7 @@ use crate::{ formats::{InternalFormat, InternalVideoFormat}, magick::PolicyDir, serde_str::Serde, + state::State, tmp_file::TmpDir, }; use actix_web::web; @@ -81,18 +82,13 @@ impl Details { } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn from_bytes( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - timeout: u64, - input: web::Bytes, - ) -> Result { + pub(crate) async fn from_bytes(state: &State, input: web::Bytes) -> Result { let Discovery { input, width, height, frames, - } = crate::discover::discover_bytes(tmp_dir, policy_dir, timeout, input).await?; + } = crate::discover::discover_bytes(state, input).await?; Ok(Details::from_parts( input.internal_format(), diff --git a/src/discover.rs b/src/discover.rs index 1c4736c..005f76e 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -4,7 +4,7 @@ mod magick; use actix_web::web::Bytes; -use crate::{formats::InputFile, magick::PolicyDir, tmp_file::TmpDir}; +use crate::{formats::InputFile, magick::PolicyDir, state::State, tmp_file::TmpDir}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -27,18 +27,16 @@ pub(crate) enum DiscoverError { } #[tracing::instrument(level = "trace", skip_all)] -pub(crate) async fn discover_bytes( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - timeout: u64, +pub(crate) async fn discover_bytes( + state: &State, bytes: Bytes, ) -> Result { - let discovery = ffmpeg::discover_bytes(tmp_dir, timeout, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?; + + let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?; let discovery = - magick::confirm_bytes(tmp_dir, policy_dir, discovery, timeout, bytes.clone()).await?; - - let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?; + exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; Ok(discovery) } diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 253810f..d8ea421 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -16,8 +16,8 @@ pub(super) async fn check_reorient( height, frames, }: Discovery, - timeout: u64, bytes: Bytes, + timeout: u64, ) -> Result { let input = match input { InputFile::Image(ImageInput { format, .. }) => { diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index e92425b..004870e 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -10,6 +10,7 @@ use crate::{ Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec, }, process::Process, + state::State, tmp_file::TmpDir, }; use actix_web::web::Bytes; @@ -158,12 +159,11 @@ struct Flags { } #[tracing::instrument(skip_all)] -pub(super) async fn discover_bytes( - tmp_dir: &TmpDir, - timeout: u64, +pub(super) async fn discover_bytes( + state: &State, bytes: Bytes, ) -> Result, FfMpegError> { - discover_file(tmp_dir, timeout, move |mut file| { + discover_file(state, move |mut file| { let bytes = bytes.clone(); async move { @@ -191,16 +191,12 @@ async fn allows_alpha(pixel_format: &str, timeout: u64) -> Result( - tmp_dir: &TmpDir, - timeout: u64, - f: F, -) -> Result, FfMpegError> +async fn discover_file(state: &State, f: F) -> Result, FfMpegError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let input_file = tmp_dir.tmp_file(None); + let input_file = state.tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await .map_err(FfMpegError::CreateDir)?; @@ -226,7 +222,7 @@ where input_file.as_os_str(), ], &[], - timeout, + state.config.media.process_timeout, )? .read() .into_vec() @@ -250,7 +246,7 @@ where .. }) = &mut discovery.input { - *alpha = allows_alpha(&pixel_format, timeout).await?; + *alpha = allows_alpha(&pixel_format, state.config.media.process_timeout).await?; } } diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 0d1ee7c..92e458b 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -8,6 +8,7 @@ use crate::{ formats::{AnimationFormat, ImageFormat, ImageInput, InputFile}, magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::Process, + state::State, tmp_file::TmpDir, }; @@ -31,11 +32,9 @@ struct Geometry { } #[tracing::instrument(skip_all)] -pub(super) async fn confirm_bytes( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(super) async fn confirm_bytes( + state: &State, discovery: Option, - timeout: u64, bytes: Bytes, ) -> Result { match discovery { @@ -51,7 +50,7 @@ pub(super) async fn confirm_bytes( } } - discover_file(tmp_dir, policy_dir, timeout, move |mut file| async move { + discover_file(state, move |mut file| async move { file.write_from_bytes(bytes) .await .map_err(MagickError::Write)?; @@ -62,22 +61,18 @@ pub(super) async fn confirm_bytes( } #[tracing::instrument(level = "debug", skip_all)] -async fn discover_file( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - timeout: u64, - f: F, -) -> Result +async fn discover_file(state: &State, f: F) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let temporary_path = tmp_dir + let temporary_path = state + .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = tmp_dir.tmp_file(None); + let input_file = state.tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await .map_err(MagickError::CreateDir)?; @@ -90,7 +85,7 @@ where let envs = [ (MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()), - (MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()), + (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; let res = Process::run( @@ -102,7 +97,7 @@ where "JSON:".as_ref(), ], &envs, - timeout, + state.config.media.process_timeout, )? .read() .into_string() diff --git a/src/generate.rs b/src/generate.rs index 79129b7..ee5f2f2 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -143,13 +143,7 @@ async fn process( drop(permit); - let details = Details::from_bytes( - &state.tmp_dir, - &state.policy_dir, - &state.config.media.process_timeout, - bytes.clone(), - ) - .await?; + let details = Details::from_bytes(state, bytes.clone()).await?; let identifier = state .store @@ -214,14 +208,8 @@ where let stream = state.store.to_stream(&identifier, None, None).await?; - let reader = magick::thumbnail( - state, - stream, - processable_format, - ProcessableFormat::Image(thumbnail_format), - config.media.image.quality_for(thumbnail_format), - ) - .await?; + let reader = + magick::thumbnail(state, stream, processable_format, thumbnail_format).await?; (reader, thumbnail_format.media_type()) } else { @@ -234,14 +222,12 @@ where }; let reader = ffmpeg::thumbnail( - state.tmp_dir, - state.store.clone(), + &state, identifier, original_details .video_format() .unwrap_or(InternalVideoFormat::Mp4), thumbnail_format, - state.config.media.process_timeout, ) .await?; diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index ab73097..aec518d 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -6,6 +6,7 @@ use crate::{ ffmpeg::FfMpegError, formats::InternalVideoFormat, process::{Process, ProcessRead}, + state::State, store::Store, tmp_file::TmpDir, }; @@ -50,21 +51,19 @@ impl ThumbnailFormat { } } -#[tracing::instrument(skip(tmp_dir, store, timeout))] +#[tracing::instrument(skip(state))] pub(super) async fn thumbnail( - tmp_dir: &TmpDir, - store: S, + state: &State, from: Arc, input_format: InternalVideoFormat, format: ThumbnailFormat, - timeout: u64, ) -> Result { - let input_file = tmp_dir.tmp_file(Some(input_format.file_extension())); + let input_file = state.tmp_dir.tmp_file(Some(input_format.file_extension())); crate::store::file_store::safe_create_parent(&input_file) .await .map_err(FfMpegError::CreateDir)?; - let output_file = tmp_dir.tmp_file(Some(format.to_file_extension())); + let output_file = state.tmp_dir.tmp_file(Some(format.to_file_extension())); crate::store::file_store::safe_create_parent(&output_file) .await .map_err(FfMpegError::CreateDir)?; @@ -72,7 +71,8 @@ pub(super) async fn thumbnail( let mut tmp_one = crate::file::File::create(&input_file) .await .map_err(FfMpegError::CreateFile)?; - let stream = store + let stream = state + .store .to_stream(&from, None, None) .await .map_err(FfMpegError::Store)?; @@ -99,7 +99,7 @@ pub(super) async fn thumbnail( output_file.as_os_str(), ], &[], - timeout, + state.config.media.process_timeout, )?; let res = process.wait().await; diff --git a/src/generate/magick.rs b/src/generate/magick.rs index d722d57..3b03a2a 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -3,7 +3,7 @@ use std::ffi::OsStr; use actix_web::web::Bytes; use crate::{ - formats::ProcessableFormat, + formats::{ImageFormat, ProcessableFormat}, magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, state::State, @@ -14,14 +14,16 @@ use crate::{ async fn thumbnail_animation( state: &State, input_format: ProcessableFormat, - format: ProcessableFormat, - quality: Option, + thumbnail_format: ImageFormat, write_file: F, ) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { + let format = ProcessableFormat::Image(thumbnail_format); + let quality = state.config.media.image.quality_for(thumbnail_format); + let temporary_path = state .tmp_dir .tmp_folder() @@ -77,14 +79,12 @@ pub(super) async fn thumbnail( state: &State, stream: LocalBoxStream<'static, std::io::Result>, input_format: ProcessableFormat, - format: ProcessableFormat, - quality: Option, + thumbnail_format: ImageFormat, ) -> Result { thumbnail_animation( state, input_format, - format, - quality, + thumbnail_format, |mut tmp_file| async move { tmp_file .write_from_stream(stream) diff --git a/src/ingest.rs b/src/ingest.rs index d140fe7..12bcf2b 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -107,9 +107,10 @@ where let (hash_state, identifier) = process_read .with_stdout(|stdout| async move { let hasher_reader = Hasher::new(stdout); - let state = hasher_reader.state(); + let hash_state = hasher_reader.state(); - store + state + .store .save_async_read(hasher_reader, input_type.media_type()) .await .map(move |identifier| (hash_state, identifier)) @@ -117,13 +118,7 @@ where .await??; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes( - tmp_dir, - policy_dir, - media.process_timeout, - bytes_stream.into_bytes(), - ) - .await?; + let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; drop(permit); @@ -153,7 +148,7 @@ where let reader = Box::pin(tokio_util::io::StreamReader::new(stream)); let hasher_reader = Hasher::new(reader); - let state = hasher_reader.state(); + let hash_state = hasher_reader.state(); let input_type = InternalFormat::Image(crate::formats::ImageFormat::Png); @@ -164,7 +159,7 @@ where let details = Details::danger_dummy(input_type); - Ok((input_type, identifier, details, state)) + Ok((input_type, identifier, details, hash_state)) } #[allow(clippy::too_many_arguments)] @@ -192,7 +187,7 @@ where }; if let Some(endpoint) = &state.config.media.external_validation { - let stream = store.to_stream(&identifier, None, None).await?; + let stream = state.store.to_stream(&identifier, None, None).await?; let response = state .client diff --git a/src/lib.rs b/src/lib.rs index 2afa6be..277c9b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,13 +138,7 @@ async fn ensure_details_identifier( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let new_details = Details::from_bytes( - &state.tmp_dir, - &state.policy_dir, - state.config.media.process_timeout, - bytes_stream.into_bytes(), - ) - .await?; + let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; tracing::debug!("storing details for {:?}", identifier); state.repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); @@ -351,7 +345,7 @@ impl FormData for BackgroundedUpload { let stream = crate::stream::from_err(stream); - Backgrounded::proxy(&state.repo, &state.store, stream).await + Backgrounded::proxy(&state, stream).await } .instrument(span), ) @@ -539,12 +533,12 @@ async fn do_download_backgrounded( ) -> Result { metrics::counter!("pict-rs.files", "download" => "background").increment(1); - let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?; + let backgrounded = Backgrounded::proxy(&state, stream).await?; let upload_id = backgrounded.upload_id().expect("Upload ID exists"); let identifier = backgrounded.identifier().expect("Identifier exists"); - queue::queue_ingest(&repo, identifier, upload_id, None).await?; + queue::queue_ingest(&state.repo, identifier, upload_id, None).await?; backgrounded.disarm(); @@ -611,7 +605,8 @@ async fn page( for hash in &page.hashes { let hex = hash.to_hex(); - let aliases = repo + let aliases = state + .repo .aliases_for_hash(hash.clone()) .await? .into_iter() @@ -794,7 +789,7 @@ async fn process( ProcessSource::Proxy { proxy } => { let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { alias - } else if !config.server.read_only { + } else if !state.config.server.read_only { let stream = download_stream(proxy.as_str(), &state).await?; let (alias, _, _) = ingest_inline(stream, &state).await?; @@ -836,7 +831,10 @@ async fn process( .await?; } - let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; + let identifier_opt = state + .repo + .variant_identifier(hash.clone(), path_string) + .await?; if let Some(identifier) = identifier_opt { let details = ensure_details_identifier(&state, &identifier).await?; @@ -850,7 +848,7 @@ async fn process( return ranged_file_resp(&state.store, identifier, range, details, not_found).await; } - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -930,7 +928,9 @@ async fn process_head( }; if !state.config.server.read_only { - repo.accessed_variant(hash.clone(), path_string.clone()) + state + .repo + .accessed_variant(hash.clone(), path_string.clone()) .await?; } @@ -959,7 +959,7 @@ async fn process_head( async fn process_backgrounded( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - state: web::Data>, + state: web::Data>, ) -> Result { let source = match source { ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { @@ -1123,7 +1123,7 @@ async fn do_serve( async fn serve_query_head( range: Option>, web::Query(alias_query): web::Query, - state: web::Data>, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), @@ -1547,10 +1547,12 @@ fn build_client() -> Result { fn configure_endpoints( config: &mut web::ServiceConfig, state: State, + process_map: ProcessMap, extra_config: F, ) { config - .app_data(web::Data::new(state)) + .app_data(web::Data::new(state.clone())) + .app_data(web::Data::new(process_map.clone())) .route("/healthz", web::get().to(healthz::)) .service( web::scope("/image") @@ -1613,9 +1615,7 @@ fn configure_endpoints( ) .service( web::scope("/internal") - .wrap(Internal( - state.config.server.api_key.as_ref().map(|s| s.to_owned()), - )) + .wrap(Internal(state.config.server.api_key.clone())) .service(web::resource("/import").route(web::post().to(import::))) .service(web::resource("/variants").route(web::delete().to(clean_variants::))) .service(web::resource("/purge").route(web::post().to(purge::))) @@ -1623,13 +1623,13 @@ fn configure_endpoints( .service(web::resource("/aliases").route(web::get().to(aliases::))) .service(web::resource("/identifier").route(web::get().to(identifier::))) .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))) - .service(web::resource("/hashes").route(web::get().to(page))) + .service(web::resource("/hashes").route(web::get().to(page::))) .service(web::resource("/prune_missing").route(web::post().to(prune_missing::))) .configure(extra_config), ); } -fn spawn_cleanup(state: State) { +fn spawn_cleanup(state: State) { if state.config.server.read_only { return; } @@ -1668,34 +1668,21 @@ where } async fn launch_file_store( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, - store: FileStore, - client: ClientWithMiddleware, - config: Configuration, + state: State, extra_config: F, ) -> color_eyre::Result<()> { let process_map = ProcessMap::new(); - let address = config.server.address; + let address = state.config.server.address; + + let tls = Tls::from_config(&state.config); spawn_cleanup(state.clone()); - let tls = Tls::from_config(&config); - - let state = State { - config, - tmp_dir, - policy_dir, - repo, - store, - client, - }; - let server = HttpServer::new(move || { let extra_config = extra_config.clone(); let state = state.clone(); + let process_map = process_map.clone(); spawn_workers(state.clone(), process_map.clone()); @@ -1704,8 +1691,9 @@ async fn launch_file_store( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, - store: ObjectStore, - client: ClientWithMiddleware, - config: Configuration, + state: State, extra_config: F, ) -> color_eyre::Result<()> { let process_map = ProcessMap::new(); - let address = config.server.address; + let address = state.config.server.address; - let tls = Tls::from_config(&config); - - let state = State { - config: config.clone(), - tmp_dir: tmp_dir.clone(), - policy_dir: policy_dir.clone(), - repo: repo.clone(), - store: store.clone(), - client: client.clone(), - }; + let tls = Tls::from_config(&state.config); spawn_cleanup(state.clone()); let server = HttpServer::new(move || { let extra_config = extra_config.clone(); let state = state.clone(); + let process_map = process_map.clone(); spawn_workers(state.clone(), process_map.clone()); @@ -1784,8 +1759,9 @@ async fn launch_object_store( + config: Configuration, tmp_dir: ArcTmpDir, policy_dir: ArcPolicyDir, repo: ArcRepo, @@ -1832,7 +1809,6 @@ async fn migrate_inner( from: S1, to: config::primitives::Store, skip_missing_files: bool, - timeout: u64, concurrency: usize, ) -> color_eyre::Result<()> where @@ -1840,19 +1816,18 @@ where { match to { config::primitives::Store::Filesystem(config::Filesystem { path }) => { - let to = FileStore::build(path.clone(), repo.clone()).await?; + let store = FileStore::build(path.clone(), repo.clone()).await?; - migrate_store( + let to = State { + config, tmp_dir, policy_dir, repo, - from, - to, - skip_missing_files, - timeout, - concurrency, - ) - .await? + store, + client, + }; + + migrate_store(from, to, skip_missing_files, concurrency).await? } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1866,7 +1841,7 @@ where client_timeout, public_endpoint, }) => { - let to = ObjectStore::build( + let store = ObjectStore::build( endpoint.clone(), bucket_name, if use_path_style { @@ -1884,19 +1859,18 @@ where repo.clone(), ) .await? - .build(client); + .build(client.clone()); - migrate_store( + let to = State { + config, tmp_dir, policy_dir, repo, - from, - to, - skip_missing_files, - timeout, - concurrency, - ) - .await? + store, + client, + }; + + migrate_store(from, to, skip_missing_files, concurrency).await? } } @@ -2066,6 +2040,7 @@ impl PictRsConfiguration { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; migrate_inner( + config, tmp_dir, policy_dir, repo, @@ -2073,7 +2048,6 @@ impl PictRsConfiguration { from, to, skip_missing_files, - config.media.process_timeout, concurrency, ) .await?; @@ -2113,6 +2087,7 @@ impl PictRsConfiguration { .build(client.clone()); migrate_inner( + config, tmp_dir, policy_dir, repo, @@ -2120,7 +2095,6 @@ impl PictRsConfiguration { from, to, skip_missing_files, - config.media.process_timeout, concurrency, ) .await?; @@ -2150,18 +2124,19 @@ impl PictRsConfiguration { let store = FileStore::build(path, arc_repo.clone()).await?; + let state = State { + tmp_dir: tmp_dir.clone(), + policy_dir: policy_dir.clone(), + repo: arc_repo.clone(), + store: store.clone(), + config: config.clone(), + client: client.clone(), + }; + if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(path) = config.old_repo_path() { if let Some(old_repo) = repo_04::open(path)? { - repo::migrate_04( - tmp_dir.clone(), - policy_dir.clone(), - old_repo, - arc_repo.clone(), - store.clone(), - config.clone(), - ) - .await?; + repo::migrate_04(old_repo, state.clone()).await?; arc_repo .set("migrate-0.4", Arc::from(b"migrated".to_vec())) .await?; @@ -2171,28 +2146,13 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_file_store( - tmp_dir.clone(), - policy_dir.clone(), - arc_repo, - store, - client, - config, - move |sc| sled_extra_config(sc, sled_repo.clone()), - ) + launch_file_store(state, move |sc| { + sled_extra_config(sc, sled_repo.clone()) + }) .await?; } Repo::Postgres(_) => { - launch_file_store( - tmp_dir.clone(), - policy_dir.clone(), - arc_repo, - store, - client, - config, - |_| {}, - ) - .await?; + launch_file_store(state, |_| {}).await?; } } } @@ -2230,18 +2190,19 @@ impl PictRsConfiguration { .await? .build(client.clone()); + let state = State { + tmp_dir: tmp_dir.clone(), + policy_dir: policy_dir.clone(), + repo: arc_repo.clone(), + store: store.clone(), + config: config.clone(), + client: client.clone(), + }; + if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(path) = config.old_repo_path() { if let Some(old_repo) = repo_04::open(path)? { - repo::migrate_04( - tmp_dir.clone(), - policy_dir.clone(), - old_repo, - arc_repo.clone(), - store.clone(), - config.clone(), - ) - .await?; + repo::migrate_04(old_repo, state.clone()).await?; arc_repo .set("migrate-0.4", Arc::from(b"migrated".to_vec())) .await?; @@ -2251,28 +2212,13 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_object_store( - tmp_dir.clone(), - policy_dir.clone(), - arc_repo, - store, - client, - config, - move |sc| sled_extra_config(sc, sled_repo.clone()), - ) + launch_object_store(state, move |sc| { + sled_extra_config(sc, sled_repo.clone()) + }) .await?; } Repo::Postgres(_) => { - launch_object_store( - tmp_dir.clone(), - policy_dir.clone(), - arc_repo, - store, - client, - config, - |_| {}, - ) - .await?; + launch_object_store(state, |_| {}).await?; } } } diff --git a/src/magick.rs b/src/magick.rs index 01632e1..05200ff 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -7,6 +7,7 @@ use crate::{ error_code::ErrorCode, formats::ProcessableFormat, process::{Process, ProcessError, ProcessRead}, + state::State, stream::LocalBoxStream, tmp_file::{TmpDir, TmpFolder}, }; @@ -177,7 +178,7 @@ pub(crate) async fn process_image_stream_read( .await } -pub(crate) async fn process_image_process_read( +pub(crate) async fn process_image_process_read( state: &State, process_read: ProcessRead, args: Vec, diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 1c339c8..c420e88 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -14,19 +14,16 @@ use crate::{ error::{Error, UploadError}, magick::{ArcPolicyDir, PolicyDir}, repo::{ArcRepo, Hash}, + state::State, store::Store, tmp_file::{ArcTmpDir, TmpDir}, }; #[allow(clippy::too_many_arguments)] pub(super) async fn migrate_store( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, from: S1, - to: S2, + to: State, skip_missing_files: bool, - timeout: u64, concurrency: usize, ) -> Result<(), Error> where @@ -39,7 +36,7 @@ where tracing::warn!("Old store is not configured correctly"); return Err(e.into()); } - if let Err(e) = to.health_check().await { + if let Err(e) = to.repo.health_check().await { tracing::warn!("New store is not configured correctly"); return Err(e.into()); } @@ -48,17 +45,8 @@ where let mut failure_count = 0; - while let Err(e) = do_migrate_store( - tmp_dir.clone(), - policy_dir.clone(), - repo.clone(), - from.clone(), - to.clone(), - skip_missing_files, - timeout, - concurrency, - ) - .await + while let Err(e) = + do_migrate_store(from.clone(), to.clone(), skip_missing_files, concurrency).await { tracing::error!("Migration failed with {}", format!("{e:?}")); @@ -78,11 +66,8 @@ where } struct MigrateState { - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, from: S1, - to: S2, + to: State, continuing_migration: bool, skip_missing_files: bool, initial_repo_size: u64, @@ -90,26 +75,21 @@ struct MigrateState { pct: AtomicU64, index: AtomicU64, started_at: Instant, - timeout: u64, } #[allow(clippy::too_many_arguments)] async fn do_migrate_store( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, from: S1, - to: S2, + to: State, skip_missing_files: bool, - timeout: u64, concurrency: usize, ) -> Result<(), Error> where S1: Store + 'static, S2: Store + 'static, { - let continuing_migration = repo.is_continuing_migration().await?; - let initial_repo_size = repo.size().await?; + let continuing_migration = to.repo.is_continuing_migration().await?; + let initial_repo_size = to.repo.size().await?; if continuing_migration { tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes"); @@ -122,15 +102,12 @@ where } // Hashes are read in a consistent order - let stream = std::pin::pin!(repo.hashes()); + let stream = std::pin::pin!(to.repo.hashes()); let mut stream = stream.into_streamer(); let state = Rc::new(MigrateState { - tmp_dir: tmp_dir.clone(), - policy_dir: policy_dir.clone(), - repo: repo.clone(), from, - to, + to: to.clone(), continuing_migration, skip_missing_files, initial_repo_size, @@ -138,7 +115,6 @@ where pct: AtomicU64::new(initial_repo_size / 100), index: AtomicU64::new(0), started_at: Instant::now(), - timeout, }); let mut joinset = tokio::task::JoinSet::new(); @@ -165,7 +141,7 @@ where } // clean up the migration table to avoid interfering with future migrations - repo.clear().await?; + to.repo.clear().await?; tracing::warn!("Migration completed successfully"); @@ -179,9 +155,6 @@ where S2: Store, { let MigrateState { - tmp_dir, - policy_dir, - repo, from, to, continuing_migration, @@ -191,24 +164,23 @@ where pct, index, started_at, - timeout, } = state; let current_index = index.fetch_add(1, Ordering::Relaxed); - let original_identifier = match repo.identifier(hash.clone()).await { + let original_identifier = match to.repo.identifier(hash.clone()).await { Ok(Some(identifier)) => identifier, Ok(None) => { tracing::warn!( "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); - crate::queue::cleanup_hash(repo, hash.clone()).await?; + crate::queue::cleanup_hash(&to.repo, hash.clone()).await?; return Ok(()); } Err(e) => return Err(e.into()), }; - if repo.is_migrated(&original_identifier).await? { + if to.repo.is_migrated(&original_identifier).await? { // migrated original for hash - this means we can skip return Ok(()); } @@ -241,26 +213,16 @@ where } } - if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { - if !repo.is_migrated(&identifier).await? { - match migrate_file( - tmp_dir, - policy_dir, - repo, - from, - to, - &identifier, - *skip_missing_files, - *timeout, - ) - .await - { + if let Some(identifier) = to.repo.motion_identifier(hash.clone()).await? { + if !to.repo.is_migrated(&identifier).await? { + match migrate_file(from, to, &identifier, *skip_missing_files).await { Ok(new_identifier) => { - migrate_details(repo, &identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.clone(), &new_identifier) + migrate_details(&to.repo, &identifier, &new_identifier).await?; + to.repo + .relate_motion_identifier(hash.clone(), &new_identifier) .await?; - repo.mark_migrated(&identifier, &new_identifier).await?; + to.repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { tracing::warn!("Skipping motion file for hash {hash:?}"); @@ -281,28 +243,20 @@ where } } - for (variant, identifier) in repo.variants(hash.clone()).await? { - if !repo.is_migrated(&identifier).await? { - match migrate_file( - tmp_dir, - policy_dir, - repo, - from, - to, - &identifier, - *skip_missing_files, - *timeout, - ) - .await - { + for (variant, identifier) in to.repo.variants(hash.clone()).await? { + if !to.repo.is_migrated(&identifier).await? { + match migrate_file(from, to, &identifier, *skip_missing_files).await { Ok(new_identifier) => { - migrate_details(repo, &identifier, &new_identifier).await?; - repo.remove_variant(hash.clone(), variant.clone()).await?; - let _ = repo + migrate_details(&to.repo, &identifier, &new_identifier).await?; + to.repo + .remove_variant(hash.clone(), variant.clone()) + .await?; + let _ = to + .repo .relate_variant_identifier(hash.clone(), variant, &new_identifier) .await?; - repo.mark_migrated(&identifier, &new_identifier).await?; + to.repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { tracing::warn!("Skipping variant {variant} for hash {hash:?}",); @@ -323,23 +277,14 @@ where } } - match migrate_file( - tmp_dir, - policy_dir, - repo, - from, - to, - &original_identifier, - *skip_missing_files, - *timeout, - ) - .await - { + match migrate_file(from, to, &original_identifier, *skip_missing_files).await { Ok(new_identifier) => { - migrate_details(repo, &original_identifier, &new_identifier).await?; - repo.update_identifier(hash.clone(), &new_identifier) + migrate_details(&to.repo, &original_identifier, &new_identifier).await?; + to.repo + .update_identifier(hash.clone(), &new_identifier) .await?; - repo.mark_migrated(&original_identifier, &new_identifier) + to.repo + .mark_migrated(&original_identifier, &new_identifier) .await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { @@ -385,14 +330,10 @@ where #[allow(clippy::too_many_arguments)] async fn migrate_file( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, from: &S1, - to: &S2, + to: &State, identifier: &Arc, skip_missing_files: bool, - timeout: u64, ) -> Result, MigrateError> where S1: Store, @@ -403,7 +344,7 @@ where loop { tracing::trace!("migrate_file: looping"); - match do_migrate_file(tmp_dir, policy_dir, repo, from, to, identifier, timeout).await { + match do_migrate_file(from, to, identifier).await { Ok(identifier) => return Ok(identifier), Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { return Err(MigrateError::From(e)); @@ -432,13 +373,9 @@ enum MigrateError { } async fn do_migrate_file( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, from: &S1, - to: &S2, + to: &State, identifier: &Arc, - timeout: u64, ) -> Result, MigrateError> where S1: Store, @@ -449,7 +386,8 @@ where .await .map_err(MigrateError::From)?; - let details_opt = repo + let details_opt = to + .repo .details(identifier) .await .map_err(Error::from) @@ -463,11 +401,11 @@ where .await .map_err(From::from) .map_err(MigrateError::Details)?; - let new_details = - Details::from_bytes(tmp_dir, policy_dir, timeout, bytes_stream.into_bytes()) - .await - .map_err(MigrateError::Details)?; - repo.relate_details(identifier, &new_details) + let new_details = Details::from_bytes(to, bytes_stream.into_bytes()) + .await + .map_err(MigrateError::Details)?; + to.repo + .relate_details(identifier, &new_details) .await .map_err(Error::from) .map_err(MigrateError::Details)?; @@ -475,6 +413,7 @@ where }; let new_identifier = to + .store .save_stream(stream, details.media_type()) .await .map_err(MigrateError::To)?; diff --git a/src/queue/process.rs b/src/queue/process.rs index e2d2c9f..bdef92a 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -116,6 +116,7 @@ where let guard = UploadGuard::guard(upload_id); let fut = async { + let ident = unprocessed_identifier.clone(); let state2 = state.clone(); let current_span = Span::current(); diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 702498e..f2b6efa 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -16,6 +16,7 @@ use crate::{ AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _, SledRepo as OldSledRepo, }, + state::State, store::Store, tmp_file::{ArcTmpDir, TmpDir}, }; @@ -80,23 +81,19 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result #[tracing::instrument(skip_all)] pub(crate) async fn migrate_04( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, old_repo: OldSledRepo, - new_repo: ArcRepo, - store: S, - config: Configuration, + state: State, ) -> Result<(), Error> { tracing::info!("Running checks"); if let Err(e) = old_repo.health_check().await { tracing::warn!("Old repo is not configured correctly"); return Err(e.into()); } - if let Err(e) = new_repo.health_check().await { + if let Err(e) = state.repo.health_check().await { tracing::warn!("New repo is not configured correctly"); return Err(e.into()); } - if let Err(e) = store.health_check().await { + if let Err(e) = state.store.health_check().await { tracing::warn!("Store is not configured correctly"); return Err(e.into()); } @@ -116,19 +113,15 @@ pub(crate) async fn migrate_04( if let Ok(hash) = res { set.spawn_local(migrate_hash_04( - tmp_dir.clone(), - policy_dir.clone(), old_repo.clone(), - new_repo.clone(), - store.clone(), - config.clone(), + state.clone(), hash.clone(), )); } else { tracing::warn!("Failed to read hash, skipping"); } - while set.len() >= config.upgrade.concurrency { + while set.len() >= state.config.upgrade.concurrency { tracing::trace!("migrate_04: join looping"); if set.join_next().await.is_some() { @@ -156,13 +149,15 @@ pub(crate) async fn migrate_04( } if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? { - new_repo + state + .repo .set(GENERATOR_KEY, generator_state.to_vec().into()) .await?; } if let Some(generator_state) = old_repo.get(crate::NOT_FOUND_KEY).await? { - new_repo + state + .repo .set(crate::NOT_FOUND_KEY, generator_state.to_vec().into()) .await?; } @@ -193,28 +188,10 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) { } } -async fn migrate_hash_04( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - old_repo: OldSledRepo, - new_repo: ArcRepo, - store: S, - config: Configuration, - old_hash: sled::IVec, -) { +async fn migrate_hash_04(old_repo: OldSledRepo, state: State, old_hash: sled::IVec) { let mut hash_failures = 0; - while let Err(e) = timed_migrate_hash_04( - &tmp_dir, - &policy_dir, - &old_repo, - &new_repo, - &store, - &config, - old_hash.clone(), - ) - .await - { + while let Err(e) = timed_migrate_hash_04(&old_repo, &state, old_hash.clone()).await { hash_failures += 1; if hash_failures > 10 { @@ -300,19 +277,13 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> } async fn timed_migrate_hash_04( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, old_repo: &OldSledRepo, - new_repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, old_hash: sled::IVec, ) -> Result<(), Error> { tokio::time::timeout( - Duration::from_secs(config.media.external_validation_timeout * 6), - do_migrate_hash_04( - tmp_dir, policy_dir, old_repo, new_repo, store, config, old_hash, - ), + Duration::from_secs(state.config.media.process_timeout * 6), + do_migrate_hash_04(old_repo, state, old_hash), ) .await .map_err(|_| UploadError::ProcessTimeout)? @@ -320,12 +291,8 @@ async fn timed_migrate_hash_04( #[tracing::instrument(skip_all)] async fn do_migrate_hash_04( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, old_repo: &OldSledRepo, - new_repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, old_hash: sled::IVec, ) -> Result<(), Error> { let Some(identifier) = old_repo.identifier(old_hash.clone()).await? else { @@ -333,18 +300,9 @@ async fn do_migrate_hash_04( return Ok(()); }; - let size = store.len(&identifier).await?; + let size = state.store.len(&identifier).await?; - let hash_details = set_details( - tmp_dir, - policy_dir, - old_repo, - new_repo, - store, - config, - &identifier, - ) - .await?; + let hash_details = set_details(old_repo, state, &identifier).await?; let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?; let variants = old_repo.variants(old_hash.clone()).await?; @@ -354,7 +312,8 @@ async fn do_migrate_hash_04( let hash = Hash::new(hash, size, hash_details.internal_format()); - let _ = new_repo + let _ = state + .repo .create_hash_with_timestamp(hash.clone(), &identifier, hash_details.created_at()) .await?; @@ -364,66 +323,45 @@ async fn do_migrate_hash_04( .await? .unwrap_or_else(DeleteToken::generate); - let _ = new_repo + let _ = state + .repo .create_alias(&alias, &delete_token, hash.clone()) .await?; } if let Some(identifier) = motion_identifier { - new_repo + state + .repo .relate_motion_identifier(hash.clone(), &identifier) .await?; - set_details( - tmp_dir, - policy_dir, - old_repo, - new_repo, - store, - config, - &identifier, - ) - .await?; + set_details(old_repo, state, &identifier).await?; } for (variant, identifier) in variants { - let _ = new_repo + let _ = state + .repo .relate_variant_identifier(hash.clone(), variant.clone(), &identifier) .await?; - set_details( - tmp_dir, - policy_dir, - old_repo, - new_repo, - store, - config, - &identifier, - ) - .await?; + set_details(old_repo, state, &identifier).await?; - new_repo.accessed_variant(hash.clone(), variant).await?; + state.repo.accessed_variant(hash.clone(), variant).await?; } Ok(()) } async fn set_details( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, old_repo: &OldSledRepo, - new_repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, identifier: &Arc, ) -> Result { - if let Some(details) = new_repo.details(identifier).await? { + if let Some(details) = state.repo.details(identifier).await? { Ok(details) } else { - let details = - fetch_or_generate_details(tmp_dir, policy_dir, old_repo, store, config, identifier) - .await?; - new_repo.relate_details(identifier, &details).await?; + let details = fetch_or_generate_details(old_repo, state, identifier).await?; + state.repo.relate_details(identifier, &details).await?; Ok(details) } } @@ -442,11 +380,8 @@ fn details_semaphore() -> &'static Semaphore { #[tracing::instrument(skip_all)] async fn fetch_or_generate_details( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, old_repo: &OldSledRepo, - store: &S, - config: &Configuration, + state: &State, identifier: &Arc, ) -> Result { let details_opt = old_repo.details(identifier.clone()).await?; @@ -454,12 +389,11 @@ async fn fetch_or_generate_details( if let Some(details) = details_opt { Ok(details) } else { - let bytes_stream = store.to_bytes(identifier, None, None).await?; + let bytes_stream = state.store.to_bytes(identifier, None, None).await?; let bytes = bytes_stream.into_bytes(); let guard = details_semaphore().acquire().await?; - let details = - Details::from_bytes(tmp_dir, policy_dir, config.media.process_timeout, bytes).await?; + let details = Details::from_bytes(state, bytes).await?; drop(guard); Ok(details) diff --git a/src/validate.rs b/src/validate.rs index bed66e7..e4a6f29 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -71,7 +71,7 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(tmp_dir, policy_dir, timeout, bytes.clone()).await?; + } = crate::discover::discover_bytes(state, bytes.clone()).await?; match &input { InputFile::Image(input) => { @@ -127,7 +127,7 @@ async fn process_image( magick::convert_image(state, input.format, format, quality, bytes).await? } else { - exiftool::clear_metadata_bytes_read(bytes, timeout)? + exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)? }; Ok((InternalFormat::Image(format), process_read)) @@ -160,7 +160,7 @@ fn validate_animation( } #[tracing::instrument(skip(state, bytes))] -async fn process_animation( +async fn process_animation( state: &State, bytes: Bytes, input: AnimationFormat, diff --git a/src/validate/magick.rs b/src/validate/magick.rs index edb4bbe..65fa367 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -6,6 +6,7 @@ use crate::{ formats::{AnimationFormat, ImageFormat}, magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, + state::State, tmp_file::TmpDir, }; @@ -40,12 +41,12 @@ pub(super) async fn convert_animation( output.magick_format(), true, quality, - timeout, + bytes, ) .await } -async fn convert( +async fn convert( state: &State, input: &'static str, output: &'static str,