From 13f4ec6b18ee501505b0e0e9e76485cda826e618 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 11 Dec 2023 13:57:21 -0600 Subject: [PATCH] Add internal endpoint to prune records for missing media --- src/lib.rs | 33 +++++++++++++++++++++ src/queue.rs | 7 +++++ src/queue/cleanup.rs | 69 ++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 410df01..b092d3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1063,6 +1063,38 @@ async fn prepare_upgrade( })) } +#[tracing::instrument(name = "Prune missing identifiers", skip(repo))] +async fn prune_missing( + repo: web::Data, + query: Option>, +) -> Result { + let total = repo.size().await?; + + let progress = if let Some(progress) = repo.get("prune-missing-queued").await? { + progress + .as_ref() + .try_into() + .map(u64::from_be_bytes) + .unwrap_or(0) + } else { + 0 + }; + + let complete = repo.get("prune-missing-complete").await?.is_some(); + + let started = repo.get("prune-missing-started").await?.is_some(); + + if !started || query.is_some_and(|q| q.force) { + queue::prune_missing(&repo).await?; + } + + Ok(HttpResponse::Ok().json(UpgradeResponse { + complete, + progress, + total, + })) +} + #[tracing::instrument(name = "Spawning variant cleanup", skip(repo))] async fn clean_variants(repo: web::Data) -> Result { queue::cleanup_all_variants(&repo).await?; @@ -1265,6 +1297,7 @@ fn configure_endpoints< .service( web::resource("/prepare_upgrade").route(web::post().to(prepare_upgrade::)), ) + .service(web::resource("/prune_missing}").route(web::post().to(prune_missing::))) .configure(extra_config), ); } diff --git a/src/queue.rs b/src/queue.rs index 5f32ce6..4359802 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -60,6 +60,7 @@ enum Cleanup { hash: Base64Bytes, }, AllVariants, + Prune, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -125,6 +126,12 @@ pub(crate) async fn cleanup_all_variants(repo: &R) -> Result<(), E Ok(()) } +pub(crate) async fn prune_missing(repo: &R) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Prune)?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn queue_prepare_upgrade(repo: &R) -> Result<(), Error> { let job = serde_json::to_vec(&Process::PrepareUpgrade)?; repo.push(PROCESS_QUEUE, job.into()).await?; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 3bd8bf0..cf4dc40 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -5,7 +5,7 @@ use crate::{ serde_str::Serde, store::{Identifier, Store}, }; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use reqwest_middleware::ClientWithMiddleware; pub(super) fn perform<'a, R, S>( @@ -15,8 +15,8 @@ pub(super) fn perform<'a, R, S>( job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where - R: FullRepo, - S: Store, + R: FullRepo + 'static, + S: Store + 'static, { Box::pin(async move { match serde_json::from_slice(job) { @@ -42,6 +42,7 @@ where hash: Base64Bytes(hash), } => variant::(repo, hash).await?, Cleanup::AllVariants => all_variants::(repo).await?, + Cleanup::Prune => prune::(repo, store).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -159,8 +160,7 @@ where { let mut hash_stream = Box::pin(repo.hashes().await); - while let Some(res) = hash_stream.next().await { - let hash = res?; + while let Some(hash) = hash_stream.try_next().await? { super::cleanup_variants(repo, hash).await?; } @@ -181,3 +181,62 @@ where Ok(()) } + +async fn prune(repo: &R, store: &S) -> Result<(), Error> +where + R: FullRepo + 'static, + S: Store + 'static, +{ + repo.set("prune-missing-started", b"1".to_vec().into()) + .await?; + + let mut hash_stream = Box::pin(repo.hashes().await); + + let mut count: u64 = 0; + + while let Some(hash) = hash_stream.try_next().await? { + let repo = repo.clone(); + let store = store.clone(); + + let res = actix_rt::spawn(async move { + let mut count = count; + + if let Some(ident) = repo.identifier(hash.clone()).await? { + match store.len(&ident).await { + Err(e) if e.is_not_found() => { + super::cleanup_hash(&repo, hash).await?; + + count += 1; + + repo.set( + "prune-missing-queued", + Vec::from(count.to_be_bytes()).into(), + ) + .await?; + } + _ => (), + } + } + + Ok(count) as Result + }) + .await; + + match res { + Ok(Ok(updated)) => count = updated, + Ok(Err(e)) => { + tracing::warn!("Prune missing identifier failed - {e:?}"); + } + Err(_) => { + tracing::warn!("Prune missing identifier panicked."); + } + } + + count += 1; + } + + repo.set("prune-missing-complete", b"1".to_vec().into()) + .await?; + + Ok(()) +}