From fe1f7c869f40ddae16e292d29f964c1c605299ba Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 22 Jul 2023 19:41:50 -0500 Subject: [PATCH] Enable cleanup of variants Still TODO: doing a first pass to mark variants as accessed on launch --- src/generate.rs | 4 ++-- src/lib.rs | 52 +++++++++++++++++++++++++++++++++++----- src/queue.rs | 45 ++++++++++++++++++++++++++++------ src/queue/cleanup.rs | 57 ++++++++++++++++++++++++++++++++++++++------ src/repo.rs | 12 +++++----- src/repo/sled.rs | 11 +++++++-- 6 files changed, 151 insertions(+), 30 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index e10b23d..3f12855 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -43,7 +43,7 @@ impl Drop for MetricsGuard { } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(repo, store, hash))] +#[tracing::instrument(skip(repo, store, hash, process_map, media))] pub(crate) async fn generate( repo: &R, store: &S, @@ -78,7 +78,7 @@ pub(crate) async fn generate( } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(repo, store, hash))] +#[tracing::instrument(skip(repo, store, hash, media))] async fn process( repo: &R, store: &S, diff --git a/src/lib.rs b/src/lib.rs index 78bd879..c1fe024 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1344,16 +1344,42 @@ fn configure_endpoints< ); } -fn spawn_workers(repo: R, store: S, config: &Configuration, process_map: ProcessMap) +fn spawn_cleanup(repo: R) +where + R: FullRepo + 'static, +{ + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(async move { + let mut interval = actix_rt::time::interval(Duration::from_secs(30)); + + loop { + interval.tick().await; + + if let Err(e) = queue::cleanup_outdated_variants(&repo).await { + tracing::warn!( + "Failed to spawn cleanup for outdated variants:{}", + format!("\n{e}\n{e:?}") + ); + } + } + }); + }) +} + +fn spawn_workers(repo: R, store: S, config: Configuration, process_map: ProcessMap) where R: FullRepo + 'static, S: Store + 'static, { + let worker_id_1 = next_worker_id(&config); + let worker_id_2 = next_worker_id(&config); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn(queue::process_cleanup( repo.clone(), store.clone(), - next_worker_id(config), + config.clone(), + worker_id_1, )) }); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1361,8 +1387,8 @@ where repo, store, process_map, - config.clone(), - next_worker_id(config), + config, + worker_id_2, )) }); } @@ -1378,6 +1404,8 @@ async fn launch_file_store, }, AllVariants, + OutdatedVariants, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -110,14 +113,25 @@ pub(crate) async fn cleanup_identifier( Ok(()) } -async fn cleanup_variants(repo: &R, hash: R::Bytes) -> Result<(), Error> { +async fn cleanup_variants( + repo: &R, + hash: R::Bytes, + variant: Option, +) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Variant { hash: Base64Bytes(hash.as_ref().to_vec()), + variant, })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } +pub(crate) async fn cleanup_outdated_variants(repo: &R) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn cleanup_all_variants(repo: &R) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::AllVariants)?; repo.push(CLEANUP_QUEUE, job.into()).await?; @@ -156,8 +170,21 @@ pub(crate) async fn queue_generate( Ok(()) } -pub(crate) async fn process_cleanup(repo: R, store: S, worker_id: String) { - process_jobs(&repo, &store, worker_id, CLEANUP_QUEUE, cleanup::perform).await +pub(crate) async fn process_cleanup( + repo: R, + store: S, + config: Configuration, + worker_id: String, +) { + process_jobs( + &repo, + &store, + &config, + worker_id, + CLEANUP_QUEUE, + cleanup::perform, + ) + .await } pub(crate) async fn process_images( @@ -184,6 +211,7 @@ type LocalBoxFuture<'a, T> = Pin + 'a>>; async fn process_jobs( repo: &R, store: &S, + config: &Configuration, worker_id: String, queue: &'static str, callback: F, @@ -191,10 +219,11 @@ async fn process_jobs( R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, - for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + + Copy, { loop { - let res = job_loop(repo, store, worker_id.clone(), queue, callback).await; + let res = job_loop(repo, store, config, worker_id.clone(), queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -209,6 +238,7 @@ async fn process_jobs( async fn job_loop( repo: &R, store: &S, + config: &Configuration, worker_id: String, queue: &'static str, callback: F, @@ -217,14 +247,15 @@ where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, - for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + + Copy, { loop { let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); - span.in_scope(|| (callback)(repo, store, bytes.as_ref())) + span.in_scope(|| (callback)(repo, store, config, bytes.as_ref())) .instrument(span) .await?; } diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 3d6c162..cd71a51 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,7 +1,8 @@ use crate::{ + config::Configuration, error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, VariantAccessRepo}, serde_str::Serde, store::{Identifier, Store}, }; @@ -10,6 +11,7 @@ use futures_util::StreamExt; pub(super) fn perform<'a, R, S>( repo: &'a R, store: &'a S, + configuration: &'a Configuration, job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where @@ -38,8 +40,10 @@ where } Cleanup::Variant { hash: Base64Bytes(hash), - } => variant::(repo, hash).await?, + variant, + } => hash_variant::(repo, hash, variant).await?, Cleanup::AllVariants => all_variants::(repo).await?, + Cleanup::OutdatedVariants => outdated_variants::(repo, configuration).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -150,6 +154,7 @@ where Ok(()) } +#[tracing::instrument(skip_all)] async fn all_variants(repo: &R) -> Result<(), Error> where R: FullRepo, @@ -159,22 +164,60 @@ where while let Some(res) = hash_stream.next().await { let hash = res?; - super::cleanup_variants(repo, hash).await?; + super::cleanup_variants(repo, hash, None).await?; } Ok(()) } -async fn variant(repo: &R, hash: Vec) -> Result<(), Error> +#[tracing::instrument(skip_all)] +async fn outdated_variants(repo: &R, config: &Configuration) -> Result<(), Error> +where + R: FullRepo, + S: Store, +{ + let now = time::OffsetDateTime::now_utc(); + let since = now.saturating_sub(config.media.retention.variants.to_duration()); + + let mut variant_stream = Box::pin(repo.older_variants(since).await?); + + while let Some(res) = variant_stream.next().await { + let (hash, variant) = res?; + super::cleanup_variants(repo, hash, Some(variant)).await?; + } + + Ok(()) +} + +#[tracing::instrument(skip_all)] +async fn hash_variant( + repo: &R, + hash: Vec, + target_variant: Option, +) -> Result<(), Error> where R: FullRepo, S: Store, { let hash: R::Bytes = hash.into(); - for (variant, identifier) in repo.variants::(hash.clone()).await? { - repo.remove_variant(hash.clone(), variant).await?; - super::cleanup_identifier(repo, identifier).await?; + if let Some(target_variant) = target_variant { + if let Some(identifier) = repo + .variant_identifier::(hash.clone(), target_variant.clone()) + .await? + { + super::cleanup_identifier(repo, identifier).await?; + } + + repo.remove_variant(hash.clone(), target_variant.clone()) + .await?; + VariantAccessRepo::remove_access(repo, hash, target_variant).await?; + } else { + for (variant, identifier) in repo.variants::(hash.clone()).await? { + repo.remove_variant(hash.clone(), variant.clone()).await?; + VariantAccessRepo::remove_access(repo, hash.clone(), variant).await?; + super::cleanup_identifier(repo, identifier).await?; + } } Ok(()) diff --git a/src/repo.rs b/src/repo.rs index 3a0020e..ec085a9 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -156,7 +156,7 @@ pub(crate) trait AliasAccessRepo: BaseRepo { timestamp: time::OffsetDateTime, ) -> Result; - async fn remove(&self, alias: Alias) -> Result<(), RepoError>; + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -177,8 +177,8 @@ where T::older_aliases(self, timestamp).await } - async fn remove(&self, alias: Alias) -> Result<(), RepoError> { - T::remove(self, alias).await + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { + T::remove_access(self, alias).await } } @@ -196,7 +196,7 @@ pub(crate) trait VariantAccessRepo: BaseRepo { timestamp: time::OffsetDateTime, ) -> Result; - async fn remove(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -225,8 +225,8 @@ where T::older_variants(self, timestamp).await } - async fn remove(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - T::remove(self, hash, variant).await + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + T::remove_access(self, hash, variant).await } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index de44482..6b0e8e9 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -269,6 +269,7 @@ impl futures_util::Stream for VariantAccessStream { impl AliasAccessRepo for SledRepo { type AliasAccessStream = AliasAccessStream; + #[tracing::instrument(level = "debug", skip(self))] async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { let now_string = time::OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) @@ -289,6 +290,7 @@ impl AliasAccessRepo for SledRepo { .map_err(RepoError::from) } + #[tracing::instrument(level = "debug", skip(self))] async fn older_aliases( &self, timestamp: time::OffsetDateTime, @@ -312,7 +314,8 @@ impl AliasAccessRepo for SledRepo { }) } - async fn remove(&self, alias: Alias) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); @@ -332,6 +335,7 @@ impl AliasAccessRepo for SledRepo { impl VariantAccessRepo for SledRepo { type VariantAccessStream = VariantAccessStream; + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { let key = variant_access_key(&hash, &variant); @@ -354,6 +358,7 @@ impl VariantAccessRepo for SledRepo { .map_err(RepoError::from) } + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] async fn contains_variant( &self, hash: Self::Bytes, @@ -366,6 +371,7 @@ impl VariantAccessRepo for SledRepo { Ok(timestamp.is_some()) } + #[tracing::instrument(level = "debug", skip(self))] async fn older_variants( &self, timestamp: time::OffsetDateTime, @@ -389,7 +395,8 @@ impl VariantAccessRepo for SledRepo { }) } - async fn remove(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { let key = variant_access_key(&hash, &variant); let variant_access = self.variant_access.clone();