diff --git a/defaults.toml b/defaults.toml index 1312337..c43d4f2 100644 --- a/defaults.toml +++ b/defaults.toml @@ -1,5 +1,6 @@ [server] address = '0.0.0.0:8080' +worker_id = 'pict-rs-1' [tracing.logging] format = 'normal' targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' diff --git a/dev.toml b/dev.toml index 9a5748b..04e9397 100644 --- a/dev.toml +++ b/dev.toml @@ -1,5 +1,6 @@ [server] address = '0.0.0.0:8080' +worker_id = 'pict-rs-1' [tracing.logging] format = 'normal' targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' diff --git a/docker/object-storage/pict-rs.toml b/docker/object-storage/pict-rs.toml index 980d749..f710493 100644 --- a/docker/object-storage/pict-rs.toml +++ b/docker/object-storage/pict-rs.toml @@ -1,5 +1,6 @@ [server] address = '0.0.0.0:8080' +worker_id = 'pict-rs-1' [tracing.logging] format = 'normal' targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' diff --git a/src/main.rs b/src/main.rs index 15b3725..f963e38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,11 +15,12 @@ use std::{ future::ready, path::PathBuf, pin::Pin, + sync::atomic::{AtomicU64, Ordering}, task::{Context, Poll}, time::SystemTime, }; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use tracing::{debug, error, info, instrument, Span}; +use tracing::{debug, error, info, instrument}; use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; use tracing_futures::Instrument; @@ -382,38 +383,11 @@ async fn process( let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; - let save_span = tracing::info_span!( - parent: None, - "Saving variant information", - path = tracing::field::debug(&thumbnail_path), - name = tracing::field::display(&alias), - ); - save_span.follows_from(Span::current()); - let details2 = details.clone(); - let bytes2 = bytes.clone(); - let alias2 = alias.clone(); - actix_rt::spawn( - async move { - let identifier = match store.save_bytes(bytes2).await { - Ok(identifier) => identifier, - Err(e) => { - tracing::warn!("Failed to generate directory path: {}", e); - return; - } - }; - if let Err(e) = manager.store_details(&identifier, &details2).await { - tracing::warn!("Error saving variant details: {}", e); - return; - } - if let Err(e) = manager - .store_variant(&alias2, &thumbnail_path, &identifier) - .await - { - tracing::warn!("Error saving variant info: {}", e); - } - } - .instrument(save_span), - ); + let identifier = store.save_bytes(bytes.clone()).await?; + manager.store_details(&identifier, &details).await?; + manager + .store_variant(&alias, &thumbnail_path, &identifier) + .await?; Ok((details, bytes)) as Result<(Details, web::Bytes), Error> }; @@ -632,18 +606,18 @@ fn build_reqwest_client() -> reqwest::Result { .build() } +fn next_worker_id() -> String { + static WORKER_ID: AtomicU64 = AtomicU64::new(0); + + let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); + + format!("{}-{}", CONFIG.server.worker_id, next_id) +} + async fn launch( manager: UploadManager, store: S, ) -> color_eyre::Result<()> { - let repo = manager.repo().clone(); - - actix_rt::spawn(queue::process_jobs( - repo, - store.clone(), - CONFIG.server.worker_id.as_bytes().to_vec(), - )); - // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it @@ -717,11 +691,20 @@ async fn launch( ); HttpServer::new(move || { + let manager = manager.clone(); + let store = store.clone(); + + actix_rt::spawn(queue::process_jobs( + manager.repo().clone(), + store.clone(), + next_worker_id(), + )); + App::new() .wrap(TracingLogger::default()) .wrap(Deadline) - .app_data(web::Data::new(store.clone())) - .app_data(web::Data::new(manager.clone())) + .app_data(web::Data::new(store)) + .app_data(web::Data::new(manager)) .app_data(web::Data::new(build_client())) .app_data(web::Data::new(CONFIG.media.filters.clone())) .service( diff --git a/src/queue.rs b/src/queue.rs index 04af1b6..6579ae2 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,103 +1,100 @@ use crate::{ error::Error, repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, - store::Store, + store::{Identifier, Store}, }; -use tracing::{debug, error, Span}; +use tracing::{debug, error}; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Job { - Cleanup { hash: Vec }, + CleanupHash { hash: Vec }, + CleanupIdentifier { identifier: Vec }, } pub(crate) async fn queue_cleanup(repo: &R, hash: R::Bytes) -> Result<(), Error> { - let job = serde_json::to_vec(&Job::Cleanup { + let job = serde_json::to_vec(&Job::CleanupHash { hash: hash.as_ref().to_vec(), })?; repo.push(job.into()).await?; Ok(()) } -pub(crate) async fn process_jobs(repo: Repo, store: S, worker_id: Vec) { - loop { - let res = match repo { - Repo::Sled(ref repo) => do_process_jobs(repo, &store, worker_id.clone()).await, - }; +pub(crate) async fn process_jobs(repo: Repo, store: S, worker_id: String) { + match repo { + Repo::Sled(ref repo) => { + if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await { + if let Err(e) = run_job(repo, &store, &job).await { + tracing::warn!("Failed to run previously dropped job: {}", e); + tracing::warn!("{:?}", e); + } + } + loop { + let res = job_loop(repo, &store, worker_id.clone()).await; - if let Err(e) = res { - tracing::warn!("Error processing jobs: {}", e); - tracing::warn!("{:?}", e); - continue; + if let Err(e) = res { + tracing::warn!("Error processing jobs: {}", e); + tracing::warn!("{:?}", e); + continue; + } + + break; + } } - - break; } } -async fn do_process_jobs(repo: &R, store: &S, worker_id: Vec) -> Result<(), Error> +async fn job_loop(repo: &R, store: &S, worker_id: String) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, { loop { - let bytes = repo.pop(worker_id.clone()).await?; + let bytes = repo.pop(worker_id.as_bytes().to_vec()).await?; - match serde_json::from_slice(bytes.as_ref()) { - Ok(job) => match job { - Job::Cleanup { hash } => cleanup(repo, store, hash).await?, - }, - Err(e) => { - tracing::warn!("Invalid job: {}", e); - } - } + run_job(repo, store, bytes.as_ref()).await?; } } -#[tracing::instrument(skip(repo, store))] -async fn cleanup(repo: &R, store: &S, hash: Vec) -> Result<(), Error> +async fn run_job(repo: &R, store: &S, job: &[u8]) -> Result<(), Error> where - R: HashRepo + IdentifierRepo + AliasRepo, + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, { - let hash: R::Bytes = hash.into(); - - let aliases = repo.aliases(hash.clone()).await?; - - if !aliases.is_empty() { - return Ok(()); + match serde_json::from_slice(job) { + Ok(job) => match job { + Job::CleanupHash { hash } => cleanup_hash::(repo, hash).await?, + Job::CleanupIdentifier { identifier } => { + cleanup_identifier(repo, store, identifier).await? + } + }, + Err(e) => { + tracing::warn!("Invalid job: {}", e); + } } - let variant_idents = repo - .variants::(hash.clone()) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - let main_ident = repo.identifier(hash.clone()).await?; - let motion_ident = repo.motion_identifier(hash.clone()).await?; + Ok(()) +} - HashRepo::cleanup(repo, hash).await?; - - let cleanup_span = tracing::info_span!(parent: None, "Cleaning files"); - cleanup_span.follows_from(Span::current()); +#[tracing::instrument(skip(repo, store))] +async fn cleanup_identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> +where + R: QueueRepo + HashRepo + IdentifierRepo, + R::Bytes: Clone, + S: Store, +{ + let identifier = S::Identifier::from_bytes(identifier)?; let mut errors = Vec::new(); - for identifier in variant_idents - .iter() - .chain(&[main_ident]) - .chain(motion_ident.iter()) - { - debug!("Deleting {:?}", identifier); - if let Err(e) = store.remove(identifier).await { - errors.push(e); - } + debug!("Deleting {:?}", identifier); + if let Err(e) = store.remove(&identifier).await { + errors.push(e); + } - if let Err(e) = IdentifierRepo::cleanup(repo, identifier).await { - errors.push(e); - } + if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await { + errors.push(e); } if !errors.is_empty() { @@ -111,3 +108,39 @@ where Ok(()) } + +#[tracing::instrument(skip(repo))] +async fn cleanup_hash(repo: &R, hash: Vec) -> Result<(), Error> +where + R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo, + R::Bytes: Clone, + S: Store, +{ + let hash: R::Bytes = hash.into(); + + let aliases = repo.aliases(hash.clone()).await?; + + if !aliases.is_empty() { + return Ok(()); + } + + let mut idents = repo + .variants::(hash.clone()) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + idents.push(repo.identifier(hash.clone()).await?); + idents.extend(repo.motion_identifier(hash.clone()).await?); + + for identifier in idents { + if let Ok(identifier) = identifier.to_bytes() { + let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?; + repo.push(job.into()).await?; + } + } + + HashRepo::cleanup(repo, hash).await?; + + Ok(()) +}