diff --git a/src/queue.rs b/src/queue.rs index 6579ae2..9987b8b 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,9 +1,12 @@ use crate::{ error::Error, repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, - store::{Identifier, Store}, + store::Store, }; -use tracing::{debug, error}; + +mod cleanup; + +const CLEANUP_QUEUE: &str = "cleanup"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Job { @@ -15,7 +18,7 @@ pub(crate) async fn queue_cleanup(repo: &R, hash: R::Bytes) -> Res let job = serde_json::to_vec(&Job::CleanupHash { hash: hash.as_ref().to_vec(), })?; - repo.push(job.into()).await?; + repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } @@ -50,7 +53,9 @@ where S: Store, { loop { - let bytes = repo.pop(worker_id.as_bytes().to_vec()).await?; + let bytes = repo + .pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec()) + .await?; run_job(repo, store, bytes.as_ref()).await?; } @@ -64,9 +69,9 @@ where { match serde_json::from_slice(job) { Ok(job) => match job { - Job::CleanupHash { hash } => cleanup_hash::(repo, hash).await?, + Job::CleanupHash { hash } => cleanup::hash::(repo, hash).await?, Job::CleanupIdentifier { identifier } => { - cleanup_identifier(repo, store, identifier).await? + cleanup::identifier(repo, store, identifier).await? } }, Err(e) => { @@ -76,71 +81,3 @@ where Ok(()) } - -#[tracing::instrument(skip(repo, store))] -async fn cleanup_identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> -where - R: QueueRepo + HashRepo + IdentifierRepo, - R::Bytes: Clone, - S: Store, -{ - let identifier = S::Identifier::from_bytes(identifier)?; - - let mut errors = Vec::new(); - - debug!("Deleting {:?}", identifier); - if let Err(e) = store.remove(&identifier).await { - errors.push(e); - } - - if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await { - errors.push(e); - } - - if !errors.is_empty() { - let span = tracing::error_span!("Error deleting files"); - span.in_scope(|| { - for error in errors { - error!("{}", error); - } - }); - } - - Ok(()) -} - -#[tracing::instrument(skip(repo))] -async fn cleanup_hash(repo: &R, hash: Vec) -> Result<(), Error> -where - R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo, - R::Bytes: Clone, - S: Store, -{ - let hash: R::Bytes = hash.into(); - - let aliases = repo.aliases(hash.clone()).await?; - - if !aliases.is_empty() { - return Ok(()); - } - - let mut idents = repo - .variants::(hash.clone()) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - idents.push(repo.identifier(hash.clone()).await?); - idents.extend(repo.motion_identifier(hash.clone()).await?); - - for identifier in idents { - if let Ok(identifier) = identifier.to_bytes() { - let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?; - repo.push(job.into()).await?; - } - } - - HashRepo::cleanup(repo, hash).await?; - - Ok(()) -} diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs new file mode 100644 index 0000000..9bed6c7 --- /dev/null +++ b/src/queue/cleanup.rs @@ -0,0 +1,74 @@ +use crate::{ + error::Error, + queue::{Job, CLEANUP_QUEUE}, + repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo}, + store::{Identifier, Store}, +}; +use tracing::error; + +#[tracing::instrument(skip(repo, store))] +pub(super) async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> +where + R: QueueRepo + HashRepo + IdentifierRepo, + R::Bytes: Clone, + S: Store, +{ + let identifier = S::Identifier::from_bytes(identifier)?; + + let mut errors = Vec::new(); + + if let Err(e) = store.remove(&identifier).await { + errors.push(e); + } + + if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await { + errors.push(e); + } + + if !errors.is_empty() { + let span = tracing::error_span!("Error deleting files"); + span.in_scope(|| { + for error in errors { + error!("{}", error); + } + }); + } + + Ok(()) +} + +#[tracing::instrument(skip(repo))] +pub(super) async fn hash(repo: &R, hash: Vec) -> Result<(), Error> +where + R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo, + R::Bytes: Clone, + S: Store, +{ + let hash: R::Bytes = hash.into(); + + let aliases = repo.aliases(hash.clone()).await?; + + if !aliases.is_empty() { + return Ok(()); + } + + let mut idents = repo + .variants::(hash.clone()) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + idents.push(repo.identifier(hash.clone()).await?); + idents.extend(repo.motion_identifier(hash.clone()).await?); + + for identifier in idents { + if let Ok(identifier) = identifier.to_bytes() { + let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + } + } + + HashRepo::cleanup(repo, hash).await?; + + Ok(()) +} diff --git a/src/repo.rs b/src/repo.rs index 27f3cf0..145c3e7 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -38,16 +38,16 @@ pub(crate) trait BaseRepo { pub(crate) trait QueueRepo: BaseRepo { async fn in_progress(&self, worker_id: Vec) -> Result, Error>; - async fn push(&self, job: Self::Bytes) -> Result<(), Error>; + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>; - async fn pop(&self, worker_id: Vec) -> Result; + async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; } #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo: BaseRepo { - async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>; - async fn get(&self, key: &'static [u8]) -> Result, Error>; - async fn remove(&self, key: &'static [u8]) -> Result<(), Error>; + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error>; + async fn get(&self, key: &'static str) -> Result, Error>; + async fn remove(&self, key: &'static str) -> Result<(), Error>; } #[async_trait::async_trait(?Send)] @@ -186,9 +186,9 @@ impl Repo { } } -const REPO_MIGRATION_O1: &[u8] = b"repo-migration-01"; -const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; -const GENERATOR_KEY: &[u8] = b"last-path"; +const REPO_MIGRATION_O1: &str = "repo-migration-01"; +const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; +const GENERATOR_KEY: &str = "last-path"; async fn migrate_hash(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()> where @@ -233,12 +233,12 @@ where let _ = repo.relate_details(&identifier.to_vec(), &details).await; } - if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) { + if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) { repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into()) .await?; } - if let Ok(Some(value)) = old.setting(GENERATOR_KEY) { + if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) { repo.set(GENERATOR_KEY, value.to_vec().into()).await?; } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index b34d008..c699158 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -8,7 +8,11 @@ use crate::{ }; use futures_util::Stream; use sled::{Db, IVec, Tree}; -use std::{pin::Pin, sync::Arc}; +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, RwLock}, +}; use tokio::sync::Notify; use super::BaseRepo; @@ -52,7 +56,7 @@ pub(crate) struct SledRepo { alias_delete_tokens: Tree, queue: Tree, in_progress_queue: Tree, - queue_notifier: Arc, + queue_notifier: Arc>>>, db: Db, } @@ -71,7 +75,7 @@ impl SledRepo { alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, queue: db.open_tree("pict-rs-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, - queue_notifier: Arc::new(Notify::new()), + queue_notifier: Arc::new(RwLock::new(HashMap::new())), db, }) } @@ -89,16 +93,33 @@ impl QueueRepo for SledRepo { Ok(opt) } - async fn push(&self, job: Self::Bytes) -> Result<(), Error> { + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> { let id = self.db.generate_id()?; - b!(self.queue, queue.insert(id.to_be_bytes(), job)); - self.queue_notifier.notify_one(); + let mut key = queue.as_bytes().to_vec(); + key.extend(id.to_be_bytes()); + + b!(self.queue, queue.insert(key, job)); + + if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue) { + notifier.notify_one(); + return Ok(()); + } + + self.queue_notifier + .write() + .unwrap() + .entry(queue) + .or_insert_with(|| Arc::new(Notify::new())) + .notify_one(); + Ok(()) } - async fn pop(&self, worker_id: Vec) -> Result { - let notify = Arc::clone(&self.queue_notifier); - + async fn pop( + &self, + queue_name: &'static str, + worker_id: Vec, + ) -> Result { loop { let in_progress_queue = self.in_progress_queue.clone(); @@ -106,7 +127,10 @@ impl QueueRepo for SledRepo { let job = b!(self.queue, { in_progress_queue.remove(&worker_id)?; - while let Some((key, job)) = queue.iter().find_map(Result::ok) { + while let Some((key, job)) = queue + .scan_prefix(queue_name.as_bytes()) + .find_map(Result::ok) + { in_progress_queue.insert(&worker_id, &job)?; if queue.remove(key)?.is_some() { @@ -123,6 +147,23 @@ impl QueueRepo for SledRepo { return Ok(job); } + let opt = self + .queue_notifier + .read() + .unwrap() + .get(&queue_name) + .map(Arc::clone); + + let notify = if let Some(notify) = opt { + notify + } else { + let mut guard = self.queue_notifier.write().unwrap(); + let entry = guard + .entry(queue_name) + .or_insert_with(|| Arc::new(Notify::new())); + Arc::clone(entry) + }; + notify.notified().await } } @@ -131,21 +172,21 @@ impl QueueRepo for SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { #[tracing::instrument(skip(value))] - async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> { + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> { b!(self.settings, settings.insert(key, value)); Ok(()) } #[tracing::instrument] - async fn get(&self, key: &'static [u8]) -> Result, Error> { + async fn get(&self, key: &'static str) -> Result, Error> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } #[tracing::instrument] - async fn remove(&self, key: &'static [u8]) -> Result<(), Error> { + async fn remove(&self, key: &'static str) -> Result<(), Error> { b!(self.settings, settings.remove(key)); Ok(()) diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 4145d98..dede6f3 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -20,7 +20,7 @@ pub(crate) use file_id::FileId; // - Settings Tree // - last-path -> last generated path -const GENERATOR_KEY: &[u8] = b"last-path"; +const GENERATOR_KEY: &str = "last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum FileError { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 78f490e..41bf188 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -19,7 +19,7 @@ pub(crate) use object_id::ObjectId; // - Settings Tree // - last-path -> last generated path -const GENERATOR_KEY: &[u8] = b"last-path"; +const GENERATOR_KEY: &str = "last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum ObjectError { diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 0952b81..1d1f48c 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -20,7 +20,7 @@ mod session; pub(super) use session::UploadManagerSession; -const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; +const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; #[derive(Clone)] pub(crate) struct UploadManager {