From ba27a1a223baa55739341b33b20b4d9e4c4f3021 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 16 Aug 2023 16:09:40 -0500 Subject: [PATCH] Build out repo-repo migration --- src/backgrounded.rs | 10 ++-- src/ingest.rs | 14 ++--- src/lib.rs | 17 +++--- src/migrate_store.rs | 4 +- src/queue/cleanup.rs | 19 +++--- src/repo.rs | 122 +++++++++++++++++++++++++------------ src/repo/migrate.rs | 140 +++++++++++++++++++++++++++++++++++++++++-- src/repo/sled.rs | 84 ++++++++++++++++++-------- 8 files changed, 307 insertions(+), 103 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 46577f0..9d2158b 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -1,6 +1,6 @@ use crate::{ error::Error, - repo::{ArcRepo, UploadId, UploadRepo}, + repo::{ArcRepo, UploadId}, store::Store, }; use actix_web::web::Bytes; @@ -53,11 +53,9 @@ where where P: Stream> + Unpin + 'static, { - UploadRepo::create( - self.repo.as_ref(), - self.upload_id.expect("Upload id exists"), - ) - .await?; + self.repo + .create_upload(self.upload_id.expect("Upload id exists")) + .await?; let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); diff --git a/src/ingest.rs b/src/ingest.rs index 7fea8d1..7228de2 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,7 +3,7 @@ use crate::{ either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, - repo::{Alias, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo}, + repo::{Alias, ArcRepo, DeleteToken, Hash}, store::Store, }; use actix_web::web::Bytes; @@ -137,10 +137,7 @@ async fn save_upload( where S: Store, { - if HashRepo::create(repo.as_ref(), hash.clone(), identifier) - .await? - .is_err() - { + if repo.create_hash(hash.clone(), identifier).await?.is_err() { // duplicate upload store.remove(identifier).await?; session.identifier.take(); @@ -175,7 +172,8 @@ where #[tracing::instrument(skip(self, hash))] async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> { - AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash) + self.repo + .create_alias(&alias, &self.delete_token, hash) .await? .map_err(|_| UploadError::DuplicateAlias)?; @@ -189,7 +187,9 @@ where loop { let alias = Alias::generate(input_type.file_extension().to_string()); - if AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash.clone()) + if self + .repo + .create_alias(&alias, &self.delete_token, hash.clone()) .await? .is_ok() { diff --git a/src/lib.rs b/src/lib.rs index 980158c..49035e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,10 +69,7 @@ use self::{ middleware::{Deadline, Internal}, migrate_store::migrate_store, queue::queue_generate, - repo::{ - sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, Hash, Repo, UploadId, UploadResult, - VariantAccessRepo, - }, + repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, stream::{StreamLimit, StreamTimeout}, @@ -673,7 +670,7 @@ async fn process_details( let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); if !config.server.read_only { - VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), thumbnail_string.clone()) + repo.accessed_variant(hash.clone(), thumbnail_string.clone()) .await?; } @@ -745,7 +742,7 @@ async fn process( }; if !config.server.read_only { - AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; + repo.accessed_alias(alias.clone()).await?; } alias @@ -768,7 +765,8 @@ async fn process( }; if !config.server.read_only { - VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; + repo.accessed_variant(hash.clone(), path_string.clone()) + .await?; } let identifier_opt = repo @@ -894,7 +892,8 @@ async fn process_head( }; if !config.server.read_only { - VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; + repo.accessed_variant(hash.clone(), path_string.clone()) + .await?; } let identifier_opt = repo @@ -1056,7 +1055,7 @@ async fn serve_query( }; if !config.server.read_only { - AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; + repo.accessed_alias(alias.clone()).await?; } alias diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 1508068..ab6978d 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ details::Details, error::{Error, UploadError}, - repo::{ArcRepo, Hash, IdentifierRepo}, + repo::{ArcRepo, Hash}, store::{Identifier, Store}, }; @@ -429,7 +429,7 @@ where { if let Some(details) = repo.details(from).await? { repo.relate_details(to, &details).await?; - IdentifierRepo::cleanup(repo.as_ref(), from).await?; + repo.cleanup_details(from).await?; } Ok(()) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index dbe4676..ea9723f 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -2,10 +2,7 @@ use crate::{ config::Configuration, error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, - repo::{ - Alias, AliasAccessRepo, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, IdentifierRepo, - VariantAccessRepo, - }, + repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, store::{Identifier, Store}, }; @@ -65,7 +62,7 @@ where errors.push(e); } - if let Err(e) = IdentifierRepo::cleanup(repo.as_ref(), &identifier).await { + if let Err(e) = repo.cleanup_details(&identifier).await { errors.push(e); } @@ -106,7 +103,7 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { let _ = super::cleanup_identifier(repo, identifier).await; } - HashRepo::cleanup(repo.as_ref(), hash).await?; + repo.cleanup_hash(hash).await?; Ok(()) } @@ -121,9 +118,9 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E let hash = repo.hash(&alias).await?; - AliasRepo::cleanup(repo.as_ref(), &alias).await?; + repo.cleanup_alias(&alias).await?; repo.remove_relation(alias.clone()).await?; - AliasAccessRepo::remove_access(repo.as_ref(), alias.clone()).await?; + repo.remove_alias_access(alias.clone()).await?; let Some(hash) = hash else { // hash doesn't exist, nothing to do @@ -178,7 +175,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), } else { tracing::warn!("Skipping alias cleanup - no delete token"); repo.remove_relation(alias.clone()).await?; - AliasAccessRepo::remove_access(repo.as_ref(), alias).await?; + repo.remove_alias_access(alias).await?; } } @@ -201,11 +198,11 @@ async fn hash_variant( repo.remove_variant(hash.clone(), target_variant.clone()) .await?; - VariantAccessRepo::remove_access(repo.as_ref(), hash, target_variant).await?; + repo.remove_variant_access(hash, target_variant).await?; } else { for (variant, identifier) in repo.variants(hash.clone()).await? { repo.remove_variant(hash.clone(), variant.clone()).await?; - VariantAccessRepo::remove_access(repo.as_ref(), hash.clone(), variant).await?; + repo.remove_variant_access(hash.clone(), variant).await?; super::cleanup_identifier(repo, identifier).await?; } } diff --git a/src/repo.rs b/src/repo.rs index 45c8eb6..d5fa821 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -72,7 +72,7 @@ pub(crate) enum RepoError { pub(crate) trait FullRepo: UploadRepo + SettingsRepo - + IdentifierRepo + + DetailsRepo + AliasRepo + QueueRepo + HashRepo @@ -170,14 +170,28 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait AliasAccessRepo: BaseRepo { - async fn accessed(&self, alias: Alias) -> Result<(), RepoError>; + async fn accessed_alias(&self, alias: Alias) -> Result<(), RepoError> { + self.set_accessed_alias(alias, time::OffsetDateTime::now_utc()) + .await + } + + async fn set_accessed_alias( + &self, + alias: Alias, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError>; + + async fn alias_accessed_at( + &self, + alias: Alias, + ) -> Result, RepoError>; async fn older_aliases( &self, timestamp: time::OffsetDateTime, ) -> Result>, RepoError>; - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; + async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -185,8 +199,19 @@ impl AliasAccessRepo for Arc where T: AliasAccessRepo, { - async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { - T::accessed(self, alias).await + async fn set_accessed_alias( + &self, + alias: Alias, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError> { + T::set_accessed_alias(self, alias, accessed).await + } + + async fn alias_accessed_at( + &self, + alias: Alias, + ) -> Result, RepoError> { + T::alias_accessed_at(self, alias).await } async fn older_aliases( @@ -196,23 +221,37 @@ where T::older_aliases(self, timestamp).await } - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { - T::remove_access(self, alias).await + async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError> { + T::remove_alias_access(self, alias).await } } #[async_trait::async_trait(?Send)] pub(crate) trait VariantAccessRepo: BaseRepo { - async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; + async fn accessed_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + self.set_accessed_variant(hash, variant, time::OffsetDateTime::now_utc()) + .await + } - async fn contains_variant(&self, hash: Hash, variant: String) -> Result; + async fn set_accessed_variant( + &self, + hash: Hash, + variant: String, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError>; + + async fn variant_accessed_at( + &self, + hash: Hash, + variant: String, + ) -> Result, RepoError>; async fn older_variants( &self, timestamp: time::OffsetDateTime, ) -> Result>, RepoError>; - async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; + async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -220,12 +259,21 @@ impl VariantAccessRepo for Arc where T: VariantAccessRepo, { - async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - T::accessed(self, hash, variant).await + async fn set_accessed_variant( + &self, + hash: Hash, + variant: String, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError> { + T::set_accessed_variant(self, hash, variant, accessed).await } - async fn contains_variant(&self, hash: Hash, variant: String) -> Result { - T::contains_variant(self, hash, variant).await + async fn variant_accessed_at( + &self, + hash: Hash, + variant: String, + ) -> Result, RepoError> { + T::variant_accessed_at(self, hash, variant).await } async fn older_variants( @@ -235,14 +283,14 @@ where T::older_variants(self, timestamp).await } - async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - T::remove_access(self, hash, variant).await + async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + T::remove_variant_access(self, hash, variant).await } } #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>; + async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError>; async fn wait(&self, upload_id: UploadId) -> Result; @@ -256,8 +304,8 @@ impl UploadRepo for Arc where T: UploadRepo, { - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { - T::create(self, upload_id).await + async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { + T::create_upload(self, upload_id).await } async fn wait(&self, upload_id: UploadId) -> Result { @@ -377,7 +425,7 @@ where } #[async_trait::async_trait(?Send)] -pub(crate) trait IdentifierRepo: BaseRepo { +pub(crate) trait DetailsRepo: BaseRepo { async fn relate_details( &self, identifier: &dyn Identifier, @@ -385,13 +433,13 @@ pub(crate) trait IdentifierRepo: BaseRepo { ) -> Result<(), StoreError>; async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError>; - async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError>; + async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] -impl IdentifierRepo for Arc +impl DetailsRepo for Arc where - T: IdentifierRepo, + T: DetailsRepo, { async fn relate_details( &self, @@ -405,8 +453,8 @@ where T::details(self, identifier).await } - async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { - T::cleanup(self, identifier).await + async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { + T::cleanup_details(self, identifier).await } } @@ -457,7 +505,7 @@ pub(crate) trait HashRepo: BaseRepo { async fn hashes(&self) -> LocalBoxStream<'static, Result>; - async fn create( + async fn create_hash( &self, hash: Hash, identifier: &dyn Identifier, @@ -492,7 +540,7 @@ pub(crate) trait HashRepo: BaseRepo { ) -> Result<(), StoreError>; async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError>; - async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>; + async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -508,12 +556,12 @@ where T::hashes(self).await } - async fn create( + async fn create_hash( &self, hash: Hash, identifier: &dyn Identifier, ) -> Result, StoreError> { - T::create(self, hash, identifier).await + T::create_hash(self, hash, identifier).await } async fn update_identifier( @@ -565,14 +613,14 @@ where T::motion_identifier(self, hash).await } - async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { - T::cleanup(self, hash).await + async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { + T::cleanup_hash(self, hash).await } } #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo: BaseRepo { - async fn create( + async fn create_alias( &self, alias: &Alias, delete_token: &DeleteToken, @@ -585,7 +633,7 @@ pub(crate) trait AliasRepo: BaseRepo { async fn for_hash(&self, hash: Hash) -> Result, RepoError>; - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; + async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -593,13 +641,13 @@ impl AliasRepo for Arc where T: AliasRepo, { - async fn create( + async fn create_alias( &self, alias: &Alias, delete_token: &DeleteToken, hash: Hash, ) -> Result, RepoError> { - T::create(self, alias, delete_token, hash).await + T::create_alias(self, alias, delete_token, hash).await } async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { @@ -614,8 +662,8 @@ where T::for_hash(self, hash).await } - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { - T::cleanup(self, alias).await + async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> { + T::cleanup_alias(self, alias).await } } diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 63812ff..27f4187 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -4,7 +4,7 @@ use crate::{ config::Configuration, details::Details, error::Error, - repo::{AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, VariantAccessRepo}, + repo::{ArcRepo, DeleteToken, Hash}, repo_04::{ AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _, SledRepo as OldSledRepo, @@ -16,6 +16,53 @@ use crate::{ const MIGRATE_CONCURRENCY: usize = 32; const GENERATOR_KEY: &str = "last-path"; +#[tracing::instrument(skip_all)] +pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result<(), Error> { + tracing::warn!("Running checks"); + if let Err(e) = old_repo.health_check().await { + tracing::warn!("Old repo is not configured correctly"); + return Err(e.into()); + } + if let Err(e) = new_repo.health_check().await { + tracing::warn!("New repo is not configured correctly"); + return Err(e.into()); + } + + let total_size = old_repo.size().await?; + let pct = (total_size / 100).max(1); + tracing::warn!("Checks complete, migrating repo"); + tracing::warn!("{total_size} hashes will be migrated"); + + let mut hash_stream = old_repo.hashes().await.into_streamer(); + + let mut index = 0; + while let Some(res) = hash_stream.next().await { + if let Ok(hash) = res { + let _ = migrate_hash(old_repo.clone(), new_repo.clone(), hash).await; + } else { + tracing::warn!("Failed to read hash, skipping"); + } + + index += 1; + + if index % pct == 0 { + let percent = index / pct; + + tracing::warn!("Migration {percent}% complete - {index}/{total_size}"); + } + } + + if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? { + new_repo + .set(GENERATOR_KEY, generator_state.to_vec().into()) + .await?; + } + + tracing::warn!("Migration complete"); + + Ok(()) +} + #[tracing::instrument(skip_all)] pub(crate) async fn migrate_04( old_repo: OldSledRepo, @@ -94,13 +141,32 @@ pub(crate) async fn migrate_04( Ok(()) } +async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) { + let mut hash_failures = 0; + + while let Err(e) = do_migrate_hash(&old_repo, &new_repo, hash.clone()).await { + hash_failures += 1; + + if hash_failures > 10 { + tracing::error!( + "Failed to migrate hash {}, skipping\n{hash:?}", + format!("{e:?}") + ); + + break; + } else { + tracing::warn!("Failed to migrate hash {hash:?}, retrying +{hash_failures}",); + } + } +} + async fn migrate_hash_04( old_repo: OldSledRepo, new_repo: ArcRepo, store: S, config: Configuration, old_hash: sled::IVec, -) -> Result<(), Error> { +) { let mut hash_failures = 0; while let Err(e) = @@ -114,7 +180,8 @@ async fn migrate_hash_04( hex::encode(&old_hash[..]), format!("{e:?}") ); - return Err(e); + + break; } else { tracing::warn!( "Failed to migrate hash {}, retrying +{hash_failures}", @@ -122,6 +189,65 @@ async fn migrate_hash_04( ); } } +} + +#[tracing::instrument(skip_all)] +async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> Result<(), Error> { + let Some(identifier) = old_repo.identifier(hash.clone()).await? else { + tracing::warn!("Skipping hash {hash:?}, no identifier"); + return Ok(()); + }; + + let _ = new_repo.create_hash(hash.clone(), &identifier).await?; + + if let Some(details) = old_repo.details(&identifier).await? { + new_repo.relate_details(&identifier, &details).await?; + } + + if let Some(identifier) = old_repo.motion_identifier(hash.clone()).await? { + new_repo + .relate_motion_identifier(hash.clone(), &identifier) + .await?; + + if let Some(details) = old_repo.details(&identifier).await? { + new_repo.relate_details(&identifier, &details).await?; + } + } + + for alias in old_repo.for_hash(hash.clone()).await? { + let delete_token = old_repo + .delete_token(&alias) + .await? + .unwrap_or_else(DeleteToken::generate); + let _ = new_repo + .create_alias(&alias, &delete_token, hash.clone()) + .await?; + + if let Some(timestamp) = old_repo.alias_accessed_at(alias.clone()).await? { + new_repo.set_accessed_alias(alias, timestamp).await?; + } + } + + for (variant, identifier) in old_repo.variants(hash.clone()).await? { + let _ = new_repo + .relate_variant_identifier(hash.clone(), variant.clone(), &identifier) + .await?; + + if let Some(timestamp) = new_repo + .variant_accessed_at(hash.clone(), variant.clone()) + .await? + { + new_repo + .set_accessed_variant(hash.clone(), variant, timestamp) + .await?; + } else { + new_repo.accessed_variant(hash.clone(), variant).await?; + } + + if let Some(details) = old_repo.details(&identifier).await? { + new_repo.relate_details(&identifier, &details).await?; + } + } Ok(()) } @@ -153,7 +279,7 @@ async fn do_migrate_hash_04( let hash = Hash::new(hash, size, hash_details.internal_format()); - let _ = HashRepo::create(new_repo.as_ref(), hash.clone(), &identifier).await?; + let _ = new_repo.create_hash(hash.clone(), &identifier).await?; for alias in aliases { let delete_token = old_repo @@ -161,7 +287,9 @@ async fn do_migrate_hash_04( .await? .unwrap_or_else(DeleteToken::generate); - let _ = AliasRepo::create(new_repo.as_ref(), &alias, &delete_token, hash.clone()).await?; + let _ = new_repo + .create_alias(&alias, &delete_token, hash.clone()) + .await?; } if let Some(identifier) = motion_identifier { @@ -179,7 +307,7 @@ async fn do_migrate_hash_04( set_details(old_repo, new_repo, store, config, &identifier).await?; - VariantAccessRepo::accessed(new_repo.as_ref(), hash.clone(), variant).await?; + new_repo.accessed_variant(hash.clone(), variant).await?; } Ok(()) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 9c2a16b..827563b 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,9 +2,9 @@ use crate::{ details::MaybeHumanDate, repo::{ hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, - Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, + Details, DetailsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, JobId, ProxyRepo, + QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, + VariantAccessRepo, VariantAlreadyExists, }, serde_str::Serde, store::StoreError, @@ -218,11 +218,12 @@ impl ProxyRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasAccessRepo for SledRepo { #[tracing::instrument(level = "debug", skip(self))] - async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { - let mut value_bytes = time::OffsetDateTime::now_utc() - .unix_timestamp_nanos() - .to_be_bytes() - .to_vec(); + async fn set_accessed_alias( + &self, + alias: Alias, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError> { + let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec(); value_bytes.extend_from_slice(&alias.to_bytes()); let value_bytes = IVec::from(value_bytes); @@ -251,6 +252,25 @@ impl AliasAccessRepo for SledRepo { Ok(()) } + async fn alias_accessed_at( + &self, + alias: Alias, + ) -> Result, RepoError> { + let alias = alias.to_bytes(); + + let Some(timestamp) = b!(self.variant_access, variant_access.get(alias)) else { + return Ok(None); + }; + + let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes"); + + let timestamp = + time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp)) + .expect("valid timestamp"); + + Ok(Some(timestamp)) + } + #[tracing::instrument(level = "debug", skip(self))] async fn older_aliases( &self, @@ -272,7 +292,7 @@ impl AliasAccessRepo for SledRepo { } #[tracing::instrument(level = "debug", skip(self))] - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { + async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError> { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); @@ -300,14 +320,16 @@ impl AliasAccessRepo for SledRepo { #[async_trait::async_trait(?Send)] impl VariantAccessRepo for SledRepo { #[tracing::instrument(level = "debug", skip(self))] - async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + async fn set_accessed_variant( + &self, + hash: Hash, + variant: String, + accessed: time::OffsetDateTime, + ) -> Result<(), RepoError> { let hash = hash.to_bytes(); let key = IVec::from(variant_access_key(&hash, &variant)); - let mut value_bytes = time::OffsetDateTime::now_utc() - .unix_timestamp_nanos() - .to_be_bytes() - .to_vec(); + let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec(); value_bytes.extend_from_slice(&key); let value_bytes = IVec::from(value_bytes); @@ -337,13 +359,25 @@ impl VariantAccessRepo for SledRepo { } #[tracing::instrument(level = "debug", skip(self))] - async fn contains_variant(&self, hash: Hash, variant: String) -> Result { + async fn variant_accessed_at( + &self, + hash: Hash, + variant: String, + ) -> Result, RepoError> { let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); - let timestamp = b!(self.variant_access, variant_access.get(key)); + let Some(timestamp) = b!(self.variant_access, variant_access.get(key)) else { + return Ok(None); + }; - Ok(timestamp.is_some()) + let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes"); + + let timestamp = + time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp)) + .expect("valid timestamp"); + + Ok(Some(timestamp)) } #[tracing::instrument(level = "debug", skip(self))] @@ -365,7 +399,7 @@ impl VariantAccessRepo for SledRepo { } #[tracing::instrument(level = "debug", skip(self))] - async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let hash = hash.to_bytes(); let key = IVec::from(variant_access_key(&hash, &variant)); @@ -480,7 +514,7 @@ impl Drop for PopMetricsGuard { #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { + async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); Ok(()) } @@ -884,7 +918,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { } #[async_trait::async_trait(?Send)] -impl IdentifierRepo for SledRepo { +impl DetailsRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_details( &self, @@ -918,7 +952,7 @@ impl IdentifierRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { + async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { let key = identifier.to_bytes()?; b!(self.identifier_details, identifier_details.remove(key)); @@ -984,7 +1018,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn create( + async fn create_hash( &self, hash: Hash, identifier: &dyn Identifier, @@ -1160,7 +1194,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(skip(self))] - async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { + async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { let hash = hash.to_ivec(); let hashes = self.hashes.clone(); @@ -1226,7 +1260,7 @@ fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create( + async fn create_alias( &self, alias: &Alias, delete_token: &DeleteToken, @@ -1310,7 +1344,7 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(skip(self))] - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { + async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> { let alias: IVec = alias.to_bytes().into(); let aliases = self.aliases.clone();