diff --git a/src/main.rs b/src/main.rs index 3f01fef..73e726a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -603,6 +603,12 @@ where .streaming(stream) } +#[instrument(name = "Spawning variant cleanup", skip(repo))] +async fn clean_variants(repo: web::Data) -> Result { + queue::cleanup_all_variants(&**repo).await?; + Ok(HttpResponse::NoContent().finish()) +} + #[derive(Debug, serde::Deserialize)] struct AliasQuery { alias: Serde, @@ -840,6 +846,9 @@ async fn launch( .wrap(import_form.clone()) .route(web::post().to(upload::)), ) + .service( + web::resource("/variants").route(web::delete().to(clean_variants::)), + ) .service(web::resource("/purge").route(web::post().to(purge::))) .service(web::resource("/aliases").route(web::get().to(aliases::))), ) diff --git a/src/queue.rs b/src/queue.rs index a5c5fe6..0610f0e 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -28,6 +28,10 @@ enum Cleanup { alias: Serde, token: Serde, }, + Variant { + hash: Vec, + }, + AllVariants, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -79,6 +83,20 @@ pub(crate) async fn cleanup_identifier( Ok(()) } +async fn cleanup_variants(repo: &R, hash: R::Bytes) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Variant { + hash: hash.as_ref().to_vec(), + })?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + +pub(crate) async fn cleanup_all_variants(repo: &R) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::AllVariants)?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn queue_ingest( repo: &R, identifier: Vec, diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 013d000..4f07851 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -5,6 +5,7 @@ use crate::{ serde_str::Serde, store::{Identifier, Store}, }; +use futures_util::StreamExt; use tracing::error; pub(super) fn perform<'a, R, S>( @@ -22,7 +23,7 @@ where Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, Cleanup::Identifier { identifier: in_identifier, - } => identifier(repo, &store, in_identifier).await?, + } => identifier(repo, store, in_identifier).await?, Cleanup::Alias { alias: stored_alias, token, @@ -34,6 +35,8 @@ where ) .await? } + Cleanup::Variant { hash } => variant::(repo, hash).await?, + Cleanup::AllVariants => all_variants::(repo).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", e); @@ -87,7 +90,7 @@ where if !aliases.is_empty() { for alias in aliases { let token = repo.delete_token(&alias).await?; - crate::queue::cleanup_alias(repo, alias, token).await?; + super::cleanup_alias(repo, alias, token).await?; } // Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned return Ok(()); @@ -103,7 +106,7 @@ where idents.extend(repo.motion_identifier(hash.clone()).await?); for identifier in idents { - let _ = crate::queue::cleanup_identifier(repo, identifier).await; + let _ = super::cleanup_identifier(repo, identifier).await; } HashRepo::cleanup(repo, hash).await?; @@ -126,7 +129,37 @@ where repo.remove_alias(hash.clone(), &alias).await?; if repo.aliases(hash.clone()).await?.is_empty() { - crate::queue::cleanup_hash(repo, hash).await?; + super::cleanup_hash(repo, hash).await?; + } + + Ok(()) +} + +async fn all_variants(repo: &R) -> Result<(), Error> +where + R: FullRepo, + S: Store, +{ + let mut hash_stream = Box::pin(repo.hashes().await); + + while let Some(res) = hash_stream.next().await { + let hash = res?; + super::cleanup_variants(repo, hash).await?; + } + + Ok(()) +} + +async fn variant(repo: &R, hash: Vec) -> Result<(), Error> +where + R: FullRepo, + S: Store, +{ + let hash: R::Bytes = hash.into(); + + for (variant, identifier) in repo.variants::(hash.clone()).await? { + repo.remove_variant(hash.clone(), variant).await?; + super::cleanup_identifier(repo, identifier).await?; } Ok(()) diff --git a/src/repo.rs b/src/repo.rs index dd986b7..3f32e30 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -183,6 +183,7 @@ pub(crate) trait HashRepo: BaseRepo { &self, hash: Self::Bytes, ) -> Result, Error>; + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error>; async fn relate_motion_identifier( &self, diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 5709f7d..5eed976 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -689,6 +689,18 @@ impl HashRepo for SledRepo { Ok(vec) } + #[tracing::instrument] + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> { + let key = variant_key(&hash, &variant); + + b!( + self.hash_variant_identifiers, + hash_variant_identifiers.remove(key) + ); + + Ok(()) + } + #[tracing::instrument] async fn relate_motion_identifier( &self,