diff --git a/src/config.rs b/src/config.rs index f32bc28..23ed003 100644 --- a/src/config.rs +++ b/src/config.rs @@ -54,10 +54,15 @@ impl ConfigSource { } } +pub struct PictRsConfiguration { + pub(crate) config: Configuration, + pub(crate) operation: Operation, +} + pub(crate) fn configure_without_clap, T: serde::Serialize, Q: AsRef>( source: ConfigSource, save_to: Option, -) -> color_eyre::Result<(Configuration, Operation)> { +) -> color_eyre::Result { let config = Config::builder().add_source(config::Config::try_from(&Defaults::default())?); let config = match source { @@ -83,10 +88,10 @@ pub(crate) fn configure_without_clap, T: serde::Serialize, Q: AsR std::fs::write(save_to, output)?; } - Ok((config, operation)) + Ok(PictRsConfiguration { config, operation }) } -pub(crate) fn configure() -> color_eyre::Result<(Configuration, Operation)> { +pub(crate) fn configure() -> color_eyre::Result { let Output { config_format, operation, @@ -118,5 +123,5 @@ pub(crate) fn configure() -> color_eyre::Result<(Configuration, Operation)> { std::fs::write(save_to, output)?; } - Ok((config, operation)) + Ok(PictRsConfiguration { config, operation }) } diff --git a/src/generate.rs b/src/generate.rs index 2771c6b..983fa59 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -24,7 +24,7 @@ pub(crate) async fn generate( thumbnail_args: Vec, input_format: Option, thumbnail_format: Option, - media: &'static crate::config::Media, + media: &crate::config::Media, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { let process_fut = process( @@ -58,7 +58,7 @@ async fn process( thumbnail_args: Vec, input_format: Option, thumbnail_format: Option, - media: &'static crate::config::Media, + media: &crate::config::Media, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { let permit = crate::PROCESS_SEMAPHORE.acquire().await; diff --git a/src/ingest.rs b/src/ingest.rs index 32253b1..1c4fd53 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -46,7 +46,7 @@ pub(crate) async fn ingest( store: &S, stream: impl Stream> + Unpin + 'static, declared_alias: Option, - media: &'static crate::config::Media, + media: &crate::config::Media, ) -> Result, Error> where R: FullRepo + 'static, diff --git a/src/lib.rs b/src/lib.rs index 6255568..a9b9a3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ use futures_util::{ stream::{empty, once}, Stream, StreamExt, TryStreamExt, }; -use once_cell::sync::{Lazy, OnceCell}; +use once_cell::sync::Lazy; use repo::sled::SledRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; @@ -79,7 +79,7 @@ use self::{ stream::{StreamLimit, StreamTimeout}, }; -pub use self::config::ConfigSource; +pub use self::config::{ConfigSource, PictRsConfiguration}; const MEGABYTES: usize = 1024 * 1024; const MINUTES: u32 = 60; @@ -88,21 +88,6 @@ const DAYS: u32 = 24 * HOURS; const NOT_FOUND_KEY: &str = "404-alias"; -static DO_CONFIG: OnceCell<(Configuration, Operation)> = OnceCell::new(); -static CONFIG: Lazy = Lazy::new(|| { - DO_CONFIG - .get_or_try_init(config::configure) - .expect("Failed to configure") - .0 - .clone() -}); -static OPERATION: Lazy = Lazy::new(|| { - DO_CONFIG - .get_or_try_init(config::configure) - .expect("Failed to configure") - .1 - .clone() -}); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| { tracing::trace_span!(parent: None, "Initialize semaphore") .in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) @@ -111,6 +96,7 @@ static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| { async fn ensure_details( repo: &R, store: &S, + config: &Configuration, alias: &Alias, ) -> Result { let Some(identifier) = repo.identifier_from_alias::(alias).await? else { @@ -129,7 +115,7 @@ async fn ensure_details( tracing::debug!("details exist"); Ok(details) } else { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -160,16 +146,21 @@ impl FormData for Upload { .app_data::>() .expect("No store in request") .clone(); + let config = req + .app_data::>() + .expect("No configuration in request") + .clone(); Form::new() - .max_files(CONFIG.server.max_file_count) - .max_file_size(CONFIG.media.max_file_size * MEGABYTES) + .max_files(config.server.max_file_count) + .max_file_size(config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { let repo = repo.clone(); let store = store.clone(); + let config = config.clone(); let span = tracing::info_span!("file-upload", ?filename); @@ -177,11 +168,11 @@ impl FormData for Upload { Box::pin( async move { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } - ingest::ingest(&**repo, &**store, stream, None, &CONFIG.media).await + ingest::ingest(&**repo, &**store, stream, None, &config.media).await } .instrument(span), ) @@ -209,19 +200,24 @@ impl FormData for Import { .app_data::>() .expect("No store in request") .clone(); + let config = req + .app_data::>() + .expect("No configuration in request") + .clone(); // Create a new Multipart Form validator for internal imports // // This form is expecting a single array field, 'images' with at most 10 files in it Form::new() - .max_files(CONFIG.server.max_file_count) - .max_file_size(CONFIG.media.max_file_size * MEGABYTES) + .max_files(config.server.max_file_count) + .max_file_size(config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { let repo = repo.clone(); let store = store.clone(); + let config = config.clone(); let span = tracing::info_span!("file-import", ?filename); @@ -229,7 +225,7 @@ impl FormData for Import { Box::pin( async move { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -238,7 +234,7 @@ impl FormData for Import { &**store, stream, Some(Alias::from_existing(&filename)), - &CONFIG.media, + &config.media, ) .await } @@ -257,31 +253,34 @@ impl FormData for Import { } /// Handle responding to successful uploads -#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] +#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn upload( Multipart(Upload(value)): Multipart>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { - handle_upload(value, repo, store).await + handle_upload(value, repo, store, config).await } /// Handle responding to successful uploads -#[tracing::instrument(name = "Imported files", skip(value, repo, store))] +#[tracing::instrument(name = "Imported files", skip(value, repo, store, config))] async fn import( Multipart(Import(value)): Multipart>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { - handle_upload(value, repo, store).await + handle_upload(value, repo, store, config).await } /// Handle responding to successful uploads -#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] +#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn handle_upload( value: Value>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { let images = value .map() @@ -300,7 +299,7 @@ async fn handle_upload( tracing::debug!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token().await?; - let details = ensure_details(&repo, &store, alias).await?; + let details = ensure_details(&repo, &store, &config, alias).await?; files.push(serde_json::json!({ "file": alias.to_string(), @@ -338,10 +337,16 @@ impl FormData for BackgroundedUpload { .app_data::>() .expect("No store in request") .clone(); + let config = req + .app_data::>() + .expect("No configuration in request") + .clone(); + + let read_only = config.server.read_only; Form::new() - .max_files(CONFIG.server.max_file_count) - .max_file_size(CONFIG.media.max_file_size * MEGABYTES) + .max_files(config.server.max_file_count) + .max_file_size(config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", @@ -355,7 +360,7 @@ impl FormData for BackgroundedUpload { Box::pin( async move { - if CONFIG.server.read_only { + if read_only { return Err(UploadError::ReadOnly.into()); } @@ -427,6 +432,7 @@ struct ClaimQuery { async fn claim_upload( repo: web::Data, store: web::Data, + config: web::Data, query: web::Query, ) -> Result { let upload_id = Serde::into_inner(query.into_inner().upload_id); @@ -438,7 +444,7 @@ async fn claim_upload( match upload_result { UploadResult::Success { alias, token } => { - let details = ensure_details(&repo, &store, &alias).await?; + let details = ensure_details(&repo, &store, &config, &alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -469,14 +475,15 @@ struct UrlQuery { } /// download an image from a URL -#[tracing::instrument(name = "Downloading file", skip(client, repo, store))] +#[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] async fn download( client: web::Data, repo: web::Data, store: web::Data, + config: web::Data, query: web::Query, ) -> Result { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -489,27 +496,28 @@ async fn download( let stream = res .bytes_stream() .map_err(Error::from) - .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); + .limit((config.media.max_file_size * MEGABYTES) as u64); if query.backgrounded { do_download_backgrounded(stream, repo, store).await } else { - do_download_inline(stream, repo, store).await + do_download_inline(stream, repo, store, config).await } } -#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store))] +#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))] async fn do_download_inline( stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { - let mut session = ingest::ingest(&repo, &store, stream, None, &CONFIG.media).await?; + let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?; let alias = session.alias().expect("alias should exist").to_owned(); let delete_token = session.delete_token().await?; - let details = ensure_details(&repo, &store, &alias).await?; + let details = ensure_details(&repo, &store, &config, &alias).await?; session.disarm(); @@ -553,9 +561,10 @@ async fn do_download_backgrounded( #[tracing::instrument(name = "Deleting file", skip(repo))] async fn delete( repo: web::Data, + config: web::Data, path_entries: web::Path<(String, String)>, ) -> Result { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -572,6 +581,7 @@ async fn delete( type ProcessQuery = Vec<(String, String)>; fn prepare_process( + config: &Configuration, query: web::Query, ext: &str, ) -> Result<(InputProcessableFormat, Alias, PathBuf, Vec), Error> { @@ -596,7 +606,7 @@ fn prepare_process( let operations = operations .into_iter() - .filter(|(k, _)| CONFIG.media.filters.contains(&k.to_lowercase())) + .filter(|(k, _)| config.media.filters.contains(&k.to_lowercase())) .collect::>(); let format = ext @@ -609,13 +619,14 @@ fn prepare_process( Ok((format, alias, thumbnail_path, thumbnail_args)) } -#[tracing::instrument(name = "Fetching derived details", skip(repo))] +#[tracing::instrument(name = "Fetching derived details", skip(repo, config))] async fn process_details( query: web::Query, ext: web::Path, repo: web::Data, + config: web::Data, ) -> Result { - let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; + let (_, alias, thumbnail_path, _) = prepare_process(&config, query, ext.as_str())?; let Some(hash) = repo.hash(&alias).await? else { // Invalid alias @@ -655,16 +666,21 @@ async fn not_found_hash(repo: &R) -> Result( range: Option>, query: web::Query, ext: web::Path, repo: web::Data, store: web::Data, + config: web::Data, process_map: web::Data, ) -> Result { - let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; + let (format, alias, thumbnail_path, thumbnail_args) = + prepare_process(&config, query, ext.as_str())?; let path_string = thumbnail_path.to_string_lossy().to_string(); @@ -695,7 +711,7 @@ async fn process( tracing::debug!("details exist"); details } else { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -716,11 +732,11 @@ async fn process( return ranged_file_resp(&store, identifier, range, details, not_found).await; } - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let original_details = ensure_details(&repo, &store, &alias).await?; + let original_details = ensure_details(&repo, &store, &config, &alias).await?; let (details, bytes) = generate::generate( &repo, @@ -732,7 +748,7 @@ async fn process( thumbnail_args, original_details.video_format(), None, - &CONFIG.media, + &config.media, hash, ) .await?; @@ -774,15 +790,16 @@ async fn process( )) } -#[tracing::instrument(name = "Serving processed image headers", skip(repo, store))] +#[tracing::instrument(name = "Serving processed image headers", skip(repo, store, config))] async fn process_head( range: Option>, query: web::Query, ext: web::Path, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { - let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; + let (_, alias, thumbnail_path, _) = prepare_process(&config, query, ext.as_str())?; let path_string = thumbnail_path.to_string_lossy().to_string(); let Some(hash) = repo.hash(&alias).await? else { @@ -807,7 +824,7 @@ async fn process_head( tracing::debug!("details exist"); details } else { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -837,8 +854,10 @@ async fn process_backgrounded( query: web::Query, ext: web::Path, repo: web::Data, + config: web::Data, ) -> Result { - let (target_format, source, process_path, process_args) = prepare_process(query, ext.as_str())?; + let (target_format, source, process_path, process_args) = + prepare_process(&config, query, ext.as_str())?; let path_string = process_path.to_string_lossy().to_string(); let Some(hash) = repo.hash(&source).await? else { @@ -854,7 +873,7 @@ async fn process_backgrounded( return Ok(HttpResponse::Accepted().finish()); } - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -864,26 +883,28 @@ async fn process_backgrounded( } /// Fetch file details -#[tracing::instrument(name = "Fetching details", skip(repo, store))] +#[tracing::instrument(name = "Fetching details", skip(repo, store, config))] async fn details( alias: web::Path>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { let alias = alias.into_inner(); - let details = ensure_details(&repo, &store, &alias).await?; + let details = ensure_details(&repo, &store, &config, &alias).await?; Ok(HttpResponse::Ok().json(&details)) } /// Serve files -#[tracing::instrument(name = "Serving file", skip(repo, store))] +#[tracing::instrument(name = "Serving file", skip(repo, store, config))] async fn serve( range: Option>, alias: web::Path>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { let alias = alias.into_inner(); @@ -906,7 +927,7 @@ async fn serve( return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&repo, &store, &alias).await?; + let details = ensure_details(&repo, &store, &config, &alias).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() @@ -917,12 +938,13 @@ async fn serve( ranged_file_resp(&store, identifier, range, details, not_found).await } -#[tracing::instrument(name = "Serving file headers", skip(repo, store))] +#[tracing::instrument(name = "Serving file headers", skip(repo, store, config))] async fn serve_head( range: Option>, alias: web::Path>, repo: web::Data, store: web::Data, + config: web::Data, ) -> Result { let alias = alias.into_inner(); @@ -931,7 +953,7 @@ async fn serve_head( return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&repo, &store, &alias).await?; + let details = ensure_details(&repo, &store, &config, &alias).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() @@ -1074,9 +1096,12 @@ fn srv_head( builder } -#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))] -async fn clean_variants(repo: web::Data) -> Result { - if CONFIG.server.read_only { +#[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))] +async fn clean_variants( + repo: web::Data, + config: web::Data, +) -> Result { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -1089,12 +1114,13 @@ struct AliasQuery { alias: Serde, } -#[tracing::instrument(name = "Setting 404 Image", skip(repo))] +#[tracing::instrument(name = "Setting 404 Image", skip(repo, config))] async fn set_not_found( json: web::Json, repo: web::Data, + config: web::Data, ) -> Result { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -1113,12 +1139,13 @@ async fn set_not_found( }))) } -#[tracing::instrument(name = "Purging file", skip(repo))] +#[tracing::instrument(name = "Purging file", skip(repo, config))] async fn purge( query: web::Query, repo: web::Data, + config: web::Data, ) -> Result { - if CONFIG.server.read_only { + if config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -1186,11 +1213,11 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { error } -fn build_client() -> Result { +fn build_client(config: &Configuration) -> Result { let client = reqwest::Client::builder() .user_agent("pict-rs v0.5.0-main") .use_rustls_tls() - .pool_max_idle_per_host(CONFIG.client.pool_size) + .pool_max_idle_per_host(config.client.pool_size) .build() .map_err(UploadError::BuildClient)?; @@ -1199,12 +1226,12 @@ fn build_client() -> Result { .build()) } -fn next_worker_id() -> String { +fn next_worker_id(config: &Configuration) -> String { static WORKER_ID: AtomicU64 = AtomicU64::new(0); let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); - format!("{}-{}", CONFIG.server.worker_id, next_id) + format!("{}-{}", config.server.worker_id, next_id) } fn configure_endpoints< @@ -1215,6 +1242,7 @@ fn configure_endpoints< config: &mut web::ServiceConfig, repo: R, store: S, + configuration: Configuration, client: ClientWithMiddleware, extra_config: F, ) { @@ -1222,6 +1250,7 @@ fn configure_endpoints< .app_data(web::Data::new(repo)) .app_data(web::Data::new(store)) .app_data(web::Data::new(client)) + .app_data(web::Data::new(configuration.clone())) .route("/healthz", web::get().to(healthz::)) .service( web::scope("/image") @@ -1276,7 +1305,7 @@ fn configure_endpoints< .service( web::scope("/internal") .wrap(Internal( - CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), + configuration.server.api_key.as_ref().map(|s| s.to_owned()), )) .service(web::resource("/import").route(web::post().to(import::))) .service(web::resource("/variants").route(web::delete().to(clean_variants::))) @@ -1288,7 +1317,7 @@ fn configure_endpoints< ); } -fn spawn_workers(repo: R, store: S, process_map: ProcessMap) +fn spawn_workers(repo: R, store: S, config: &Configuration, process_map: ProcessMap) where R: FullRepo + 'static, S: Store + 'static, @@ -1297,7 +1326,7 @@ where actix_rt::spawn(queue::process_cleanup( repo.clone(), store.clone(), - next_worker_id(), + next_worker_id(config), )) }); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1305,7 +1334,8 @@ where repo, store, process_map, - next_worker_id(), + config.clone(), + next_worker_id(config), )) }); } @@ -1314,26 +1344,29 @@ async fn launch_file_store std::io::Result<()> { let process_map = ProcessMap::new(); + let address = config.server.address; + HttpServer::new(move || { let client = client.clone(); - let store = store.clone(); let repo = repo.clone(); + let config = config.clone(); let extra_config = extra_config.clone(); - spawn_workers(repo.clone(), store.clone(), process_map.clone()); + spawn_workers(repo.clone(), store.clone(), &config, process_map.clone()); App::new() .wrap(TracingLogger::default()) .wrap(Deadline) .app_data(web::Data::new(process_map.clone())) - .configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) + .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) }) - .bind(CONFIG.server.address)? + .bind(address)? .run() .await } @@ -1345,26 +1378,29 @@ async fn launch_object_store< repo: R, store_config: ObjectStoreConfig, client: ClientWithMiddleware, + config: Configuration, extra_config: F, ) -> std::io::Result<()> { let process_map = ProcessMap::new(); + let address = config.server.address; + HttpServer::new(move || { let client = client.clone(); - let store = store_config.clone().build(client.clone()); let repo = repo.clone(); + let config = config.clone(); let extra_config = extra_config.clone(); - spawn_workers(repo.clone(), store.clone(), process_map.clone()); + spawn_workers(repo.clone(), store.clone(), &config, process_map.clone()); App::new() .wrap(TracingLogger::default()) .wrap(Deadline) .app_data(web::Data::new(process_map.clone())) - .configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) + .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) }) - .bind(CONFIG.server.address)? + .bind(address)? .run() .await } @@ -1463,25 +1499,14 @@ impl, T: serde::Serialize> ConfigSource { /// Ok(()) /// } /// ``` - pub fn init>(self, save_to: Option) -> color_eyre::Result<()> { - let (config, operation) = config::configure_without_clap(self, save_to)?; - - DO_CONFIG - .set((config, operation)) - .unwrap_or_else(|_| panic!("CONFIG cannot be initialized more than once")); - - Ok(()) + pub fn init>( + self, + save_to: Option, + ) -> color_eyre::Result { + config::configure_without_clap(self, save_to) } } -/// Install the default pict-rs tracer -/// -/// This is probably not useful for 3rd party applications that install their own tracing -/// subscribers. -pub fn install_tracing() -> color_eyre::Result<()> { - init_tracing(&CONFIG.tracing) -} - async fn export_handler(repo: web::Data) -> Result { repo.export().await?; @@ -1494,106 +1519,111 @@ fn sled_extra_config(sc: &mut web::ServiceConfig) { sc.service(web::resource("/export").route(web::post().to(export_handler))); } -/// Run the pict-rs application -/// -/// This must be called after `init_config`, or else the default configuration builder will run and -/// fail. -pub async fn run() -> color_eyre::Result<()> { - let repo = Repo::open(CONFIG.repo.clone())?; - repo.migrate_from_db(CONFIG.old_db.path.clone()).await?; - let client = build_client()?; +impl PictRsConfiguration { + /// Build the pict-rs configuration from commandline arguments + /// + /// This is probably not useful for 3rd party applications that handle their own commandline + pub fn build_default() -> color_eyre::Result { + config::configure() + } - match (*OPERATION).clone() { - Operation::Run => (), - Operation::MigrateStore { - skip_missing_files, - from, - to, - } => { - match from { - config::primitives::Store::Filesystem(config::Filesystem { path }) => { - let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(repo, client, from, to, skip_missing_files).await?; - } - config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { - endpoint, - bucket_name, - use_path_style, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - }) => { - let from = ObjectStore::build( - endpoint, - bucket_name, - if use_path_style { - UrlStyle::Path - } else { - UrlStyle::VirtualHost + /// Install the default pict-rs tracer + /// + /// This is probably not useful for 3rd party applications that install their own tracing + /// subscribers. + pub fn install_tracing(&self) -> color_eyre::Result<()> { + init_tracing(&self.config.tracing) + } + + /// Run the pict-rs application + /// + /// This must be called after `init_config`, or else the default configuration builder will run and + /// fail. + pub async fn run(self) -> color_eyre::Result<()> { + let PictRsConfiguration { config, operation } = self; + + let repo = Repo::open(config.repo.clone())?; + repo.migrate_from_db(config.old_db.path.clone()).await?; + let client = build_client(&config)?; + + match operation { + Operation::Run => (), + Operation::MigrateStore { + skip_missing_files, + from, + to, + } => { + match from { + config::primitives::Store::Filesystem(config::Filesystem { path }) => { + let from = FileStore::build(path.clone(), repo.clone()).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; + } + config::primitives::Store::ObjectStorage( + config::primitives::ObjectStorage { + endpoint, + bucket_name, + use_path_style, + region, + access_key, + secret_key, + session_token, + signature_duration, + client_timeout, + public_endpoint, }, - region, - access_key, - secret_key, - session_token, - signature_duration.unwrap_or(15), - client_timeout.unwrap_or(30), - public_endpoint, - repo.clone(), - ) - .await? - .build(client.clone()); + ) => { + let from = ObjectStore::build( + endpoint, + bucket_name, + if use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, + region, + access_key, + secret_key, + session_token, + signature_duration.unwrap_or(15), + client_timeout.unwrap_or(30), + public_endpoint, + repo.clone(), + ) + .await? + .build(client.clone()); - migrate_inner(repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; + } } - } - return Ok(()); - } - } - - if CONFIG.server.read_only { - tracing::warn!("Launching in READ ONLY mode"); - } - - match CONFIG.store.clone() { - config::Store::Filesystem(config::Filesystem { path }) => { - repo.migrate_identifiers().await?; - - let store = FileStore::build(path, repo.clone()).await?; - match repo { - Repo::Sled(sled_repo) => { - sled_repo - .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) - .await?; - - launch_file_store(sled_repo, store, client, sled_extra_config).await?; - } + return Ok(()); } } - config::Store::ObjectStorage(config::ObjectStorage { - endpoint, - bucket_name, - use_path_style, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - }) => { - let store = ObjectStore::build( + + if config.server.read_only { + tracing::warn!("Launching in READ ONLY mode"); + } + + match config.store.clone() { + config::Store::Filesystem(config::Filesystem { path }) => { + repo.migrate_identifiers().await?; + + let store = FileStore::build(path, repo.clone()).await?; + match repo { + Repo::Sled(sled_repo) => { + sled_repo + .requeue_in_progress(config.server.worker_id.as_bytes().to_vec()) + .await?; + + launch_file_store(sled_repo, store, client, config, sled_extra_config) + .await?; + } + } + } + config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - if use_path_style { - UrlStyle::Path - } else { - UrlStyle::VirtualHost - }, + use_path_style, region, access_key, secret_key, @@ -1601,23 +1631,41 @@ pub async fn run() -> color_eyre::Result<()> { signature_duration, client_timeout, public_endpoint, - repo.clone(), - ) - .await?; + }) => { + let store = ObjectStore::build( + endpoint, + bucket_name, + if use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, + region, + access_key, + secret_key, + session_token, + signature_duration, + client_timeout, + public_endpoint, + repo.clone(), + ) + .await?; - match repo { - Repo::Sled(sled_repo) => { - sled_repo - .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) - .await?; + match repo { + Repo::Sled(sled_repo) => { + sled_repo + .requeue_in_progress(config.server.worker_id.as_bytes().to_vec()) + .await?; - launch_object_store(sled_repo, store, client, sled_extra_config).await?; + launch_object_store(sled_repo, store, client, config, sled_extra_config) + .await?; + } } } } + + self::tmp_file::remove_tmp_dir().await?; + + Ok(()) } - - self::tmp_file::remove_tmp_dir().await?; - - Ok(()) } diff --git a/src/main.rs b/src/main.rs index 1cfce60..58ec8c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ #[actix_rt::main] async fn main() -> color_eyre::Result<()> { - pict_rs::install_tracing()?; - - pict_rs::run().await + let pict_rs = pict_rs::PictRsConfiguration::build_default()?; + pict_rs.install_tracing()?; + pict_rs.run().await } diff --git a/src/queue.rs b/src/queue.rs index 8afee8d..a7453c2 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,5 +1,6 @@ use crate::{ concurrent_processor::ProcessMap, + config::Configuration, error::Error, formats::InputProcessableFormat, repo::{ @@ -163,12 +164,14 @@ pub(crate) async fn process_images( repo: R, store: S, process_map: ProcessMap, + config: Configuration, worker_id: String, ) { process_image_jobs( &repo, &store, &process_map, + &config, worker_id, PROCESS_QUEUE, process::perform, @@ -231,6 +234,7 @@ async fn process_image_jobs( repo: &R, store: &S, process_map: &ProcessMap, + config: &Configuration, worker_id: String, queue: &'static str, callback: F, @@ -238,12 +242,26 @@ async fn process_image_jobs( R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, - for<'a> F: - Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn( + &'a R, + &'a S, + &'a ProcessMap, + &'a Configuration, + &'a [u8], + ) -> LocalBoxFuture<'a, Result<(), Error>> + + Copy, { loop { - let res = - image_job_loop(repo, store, process_map, worker_id.clone(), queue, callback).await; + let res = image_job_loop( + repo, + store, + process_map, + config, + worker_id.clone(), + queue, + callback, + ) + .await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -259,6 +277,7 @@ async fn image_job_loop( repo: &R, store: &S, process_map: &ProcessMap, + config: &Configuration, worker_id: String, queue: &'static str, callback: F, @@ -267,15 +286,21 @@ where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, - for<'a> F: - Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn( + &'a R, + &'a S, + &'a ProcessMap, + &'a Configuration, + &'a [u8], + ) -> LocalBoxFuture<'a, Result<(), Error>> + + Copy, { loop { let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); - span.in_scope(|| (callback)(repo, store, process_map, bytes.as_ref())) + span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref())) .instrument(span) .await?; } diff --git a/src/queue/process.rs b/src/queue/process.rs index fa43d68..10e76ea 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,5 +1,6 @@ use crate::{ concurrent_processor::ProcessMap, + config::Configuration, error::{Error, UploadError}, formats::InputProcessableFormat, ingest::Session, @@ -7,7 +8,6 @@ use crate::{ repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult}, serde_str::Serde, store::{Identifier, Store}, - CONFIG, }; use futures_util::TryStreamExt; use std::path::PathBuf; @@ -16,6 +16,7 @@ pub(super) fn perform<'a, R, S>( repo: &'a R, store: &'a S, process_map: &'a ProcessMap, + config: &'a Configuration, job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where @@ -36,7 +37,7 @@ where identifier, Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), - &CONFIG.media, + &config.media, ) .await? } @@ -54,7 +55,7 @@ where Serde::into_inner(source), process_path, process_args, - &CONFIG.media, + config, ) .await? } @@ -75,7 +76,7 @@ async fn process_ingest( unprocessed_identifier: Vec, upload_id: UploadId, declared_alias: Option, - media: &'static crate::config::Media, + media: &crate::config::Media, ) -> Result<(), Error> where R: FullRepo + 'static, @@ -88,6 +89,7 @@ where let store2 = store.clone(); let repo = repo.clone(); + let media = media.clone(); let error_boundary = actix_rt::spawn(async move { let stream = store2 .to_stream(&ident, None, None) @@ -95,7 +97,7 @@ where .map_err(Error::from); let session = - crate::ingest::ingest(&repo, &store2, stream, declared_alias, media).await?; + crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; let token = session.delete_token().await?; @@ -139,7 +141,7 @@ async fn generate( source: Alias, process_path: PathBuf, process_args: Vec, - meida: &'static crate::config::Media, + config: &Configuration, ) -> Result<(), Error> { let Some(hash) = repo.hash(&source).await? else { // Nothing to do @@ -155,7 +157,7 @@ async fn generate( return Ok(()); } - let original_details = crate::ensure_details(repo, store, &source).await?; + let original_details = crate::ensure_details(repo, store, config, &source).await?; crate::generate::generate( repo, @@ -167,7 +169,7 @@ async fn generate( process_args, original_details.video_format(), None, - meida, + &config.media, hash, ) .await?;