2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Build out repo-repo migration

This commit is contained in:
asonix 2023-08-16 16:09:40 -05:00
parent 1559d57f0a
commit ba27a1a223
8 changed files with 307 additions and 103 deletions

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
error::Error, error::Error,
repo::{ArcRepo, UploadId, UploadRepo}, repo::{ArcRepo, UploadId},
store::Store, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
@ -53,10 +53,8 @@ where
where where
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static, P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{ {
UploadRepo::create( self.repo
self.repo.as_ref(), .create_upload(self.upload_id.expect("Upload id exists"))
self.upload_id.expect("Upload id exists"),
)
.await?; .await?;
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));

View file

@ -3,7 +3,7 @@ use crate::{
either::Either, either::Either,
error::{Error, UploadError}, error::{Error, UploadError},
formats::{InternalFormat, Validations}, formats::{InternalFormat, Validations},
repo::{Alias, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo}, repo::{Alias, ArcRepo, DeleteToken, Hash},
store::Store, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
@ -137,10 +137,7 @@ async fn save_upload<S>(
where where
S: Store, S: Store,
{ {
if HashRepo::create(repo.as_ref(), hash.clone(), identifier) if repo.create_hash(hash.clone(), identifier).await?.is_err() {
.await?
.is_err()
{
// duplicate upload // duplicate upload
store.remove(identifier).await?; store.remove(identifier).await?;
session.identifier.take(); session.identifier.take();
@ -175,7 +172,8 @@ where
#[tracing::instrument(skip(self, hash))] #[tracing::instrument(skip(self, hash))]
async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> { async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> {
AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash) self.repo
.create_alias(&alias, &self.delete_token, hash)
.await? .await?
.map_err(|_| UploadError::DuplicateAlias)?; .map_err(|_| UploadError::DuplicateAlias)?;
@ -189,7 +187,9 @@ where
loop { loop {
let alias = Alias::generate(input_type.file_extension().to_string()); let alias = Alias::generate(input_type.file_extension().to_string());
if AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash.clone()) if self
.repo
.create_alias(&alias, &self.delete_token, hash.clone())
.await? .await?
.is_ok() .is_ok()
{ {

View file

@ -69,10 +69,7 @@ use self::{
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
migrate_store::migrate_store, migrate_store::migrate_store,
queue::queue_generate, queue::queue_generate,
repo::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, Hash, Repo, UploadId, UploadResult,
VariantAccessRepo,
},
serde_str::Serde, serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
stream::{StreamLimit, StreamTimeout}, stream::{StreamLimit, StreamTimeout},
@ -673,7 +670,7 @@ async fn process_details<S: Store>(
let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
if !config.server.read_only { if !config.server.read_only {
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), thumbnail_string.clone()) repo.accessed_variant(hash.clone(), thumbnail_string.clone())
.await?; .await?;
} }
@ -745,7 +742,7 @@ async fn process<S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; repo.accessed_alias(alias.clone()).await?;
} }
alias alias
@ -768,7 +765,8 @@ async fn process<S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; repo.accessed_variant(hash.clone(), path_string.clone())
.await?;
} }
let identifier_opt = repo let identifier_opt = repo
@ -894,7 +892,8 @@ async fn process_head<S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; repo.accessed_variant(hash.clone(), path_string.clone())
.await?;
} }
let identifier_opt = repo let identifier_opt = repo
@ -1056,7 +1055,7 @@ async fn serve_query<S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; repo.accessed_alias(alias.clone()).await?;
} }
alias alias

View file

@ -8,7 +8,7 @@ use std::{
use crate::{ use crate::{
details::Details, details::Details,
error::{Error, UploadError}, error::{Error, UploadError},
repo::{ArcRepo, Hash, IdentifierRepo}, repo::{ArcRepo, Hash},
store::{Identifier, Store}, store::{Identifier, Store},
}; };
@ -429,7 +429,7 @@ where
{ {
if let Some(details) = repo.details(from).await? { if let Some(details) = repo.details(from).await? {
repo.relate_details(to, &details).await?; repo.relate_details(to, &details).await?;
IdentifierRepo::cleanup(repo.as_ref(), from).await?; repo.cleanup_details(from).await?;
} }
Ok(()) Ok(())

View file

@ -2,10 +2,7 @@ use crate::{
config::Configuration, config::Configuration,
error::{Error, UploadError}, error::{Error, UploadError},
queue::{Base64Bytes, Cleanup, LocalBoxFuture}, queue::{Base64Bytes, Cleanup, LocalBoxFuture},
repo::{ repo::{Alias, ArcRepo, DeleteToken, Hash},
Alias, AliasAccessRepo, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, IdentifierRepo,
VariantAccessRepo,
},
serde_str::Serde, serde_str::Serde,
store::{Identifier, Store}, store::{Identifier, Store},
}; };
@ -65,7 +62,7 @@ where
errors.push(e); errors.push(e);
} }
if let Err(e) = IdentifierRepo::cleanup(repo.as_ref(), &identifier).await { if let Err(e) = repo.cleanup_details(&identifier).await {
errors.push(e); errors.push(e);
} }
@ -106,7 +103,7 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let _ = super::cleanup_identifier(repo, identifier).await; let _ = super::cleanup_identifier(repo, identifier).await;
} }
HashRepo::cleanup(repo.as_ref(), hash).await?; repo.cleanup_hash(hash).await?;
Ok(()) Ok(())
} }
@ -121,9 +118,9 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E
let hash = repo.hash(&alias).await?; let hash = repo.hash(&alias).await?;
AliasRepo::cleanup(repo.as_ref(), &alias).await?; repo.cleanup_alias(&alias).await?;
repo.remove_relation(alias.clone()).await?; repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo.as_ref(), alias.clone()).await?; repo.remove_alias_access(alias.clone()).await?;
let Some(hash) = hash else { let Some(hash) = hash else {
// hash doesn't exist, nothing to do // hash doesn't exist, nothing to do
@ -178,7 +175,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
} else { } else {
tracing::warn!("Skipping alias cleanup - no delete token"); tracing::warn!("Skipping alias cleanup - no delete token");
repo.remove_relation(alias.clone()).await?; repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo.as_ref(), alias).await?; repo.remove_alias_access(alias).await?;
} }
} }
@ -201,11 +198,11 @@ async fn hash_variant(
repo.remove_variant(hash.clone(), target_variant.clone()) repo.remove_variant(hash.clone(), target_variant.clone())
.await?; .await?;
VariantAccessRepo::remove_access(repo.as_ref(), hash, target_variant).await?; repo.remove_variant_access(hash, target_variant).await?;
} else { } else {
for (variant, identifier) in repo.variants(hash.clone()).await? { for (variant, identifier) in repo.variants(hash.clone()).await? {
repo.remove_variant(hash.clone(), variant.clone()).await?; repo.remove_variant(hash.clone(), variant.clone()).await?;
VariantAccessRepo::remove_access(repo.as_ref(), hash.clone(), variant).await?; repo.remove_variant_access(hash.clone(), variant).await?;
super::cleanup_identifier(repo, identifier).await?; super::cleanup_identifier(repo, identifier).await?;
} }
} }

View file

@ -72,7 +72,7 @@ pub(crate) enum RepoError {
pub(crate) trait FullRepo: pub(crate) trait FullRepo:
UploadRepo UploadRepo
+ SettingsRepo + SettingsRepo
+ IdentifierRepo + DetailsRepo
+ AliasRepo + AliasRepo
+ QueueRepo + QueueRepo
+ HashRepo + HashRepo
@ -170,14 +170,28 @@ where
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait AliasAccessRepo: BaseRepo { pub(crate) trait AliasAccessRepo: BaseRepo {
async fn accessed(&self, alias: Alias) -> Result<(), RepoError>; async fn accessed_alias(&self, alias: Alias) -> Result<(), RepoError> {
self.set_accessed_alias(alias, time::OffsetDateTime::now_utc())
.await
}
async fn set_accessed_alias(
&self,
alias: Alias,
accessed: time::OffsetDateTime,
) -> Result<(), RepoError>;
async fn alias_accessed_at(
&self,
alias: Alias,
) -> Result<Option<time::OffsetDateTime>, RepoError>;
async fn older_aliases( async fn older_aliases(
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError>; ) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError>;
async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -185,8 +199,19 @@ impl<T> AliasAccessRepo for Arc<T>
where where
T: AliasAccessRepo, T: AliasAccessRepo,
{ {
async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { async fn set_accessed_alias(
T::accessed(self, alias).await &self,
alias: Alias,
accessed: time::OffsetDateTime,
) -> Result<(), RepoError> {
T::set_accessed_alias(self, alias, accessed).await
}
async fn alias_accessed_at(
&self,
alias: Alias,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
T::alias_accessed_at(self, alias).await
} }
async fn older_aliases( async fn older_aliases(
@ -196,23 +221,37 @@ where
T::older_aliases(self, timestamp).await T::older_aliases(self, timestamp).await
} }
async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError> {
T::remove_access(self, alias).await T::remove_alias_access(self, alias).await
} }
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait VariantAccessRepo: BaseRepo { pub(crate) trait VariantAccessRepo: BaseRepo {
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn accessed_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
self.set_accessed_variant(hash, variant, time::OffsetDateTime::now_utc())
.await
}
async fn contains_variant(&self, hash: Hash, variant: String) -> Result<bool, RepoError>; async fn set_accessed_variant(
&self,
hash: Hash,
variant: String,
accessed: time::OffsetDateTime,
) -> Result<(), RepoError>;
async fn variant_accessed_at(
&self,
hash: Hash,
variant: String,
) -> Result<Option<time::OffsetDateTime>, RepoError>;
async fn older_variants( async fn older_variants(
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError>; ) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError>;
async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -220,12 +259,21 @@ impl<T> VariantAccessRepo for Arc<T>
where where
T: VariantAccessRepo, T: VariantAccessRepo,
{ {
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn set_accessed_variant(
T::accessed(self, hash, variant).await &self,
hash: Hash,
variant: String,
accessed: time::OffsetDateTime,
) -> Result<(), RepoError> {
T::set_accessed_variant(self, hash, variant, accessed).await
} }
async fn contains_variant(&self, hash: Hash, variant: String) -> Result<bool, RepoError> { async fn variant_accessed_at(
T::contains_variant(self, hash, variant).await &self,
hash: Hash,
variant: String,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
T::variant_accessed_at(self, hash, variant).await
} }
async fn older_variants( async fn older_variants(
@ -235,14 +283,14 @@ where
T::older_variants(self, timestamp).await T::older_variants(self, timestamp).await
} }
async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::remove_access(self, hash, variant).await T::remove_variant_access(self, hash, variant).await
} }
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo { pub(crate) trait UploadRepo: BaseRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>; async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError>;
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError>; async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError>;
@ -256,8 +304,8 @@ impl<T> UploadRepo for Arc<T>
where where
T: UploadRepo, T: UploadRepo,
{ {
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> {
T::create(self, upload_id).await T::create_upload(self, upload_id).await
} }
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> { async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
@ -377,7 +425,7 @@ where
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait IdentifierRepo: BaseRepo { pub(crate) trait DetailsRepo: BaseRepo {
async fn relate_details( async fn relate_details(
&self, &self,
identifier: &dyn Identifier, identifier: &dyn Identifier,
@ -385,13 +433,13 @@ pub(crate) trait IdentifierRepo: BaseRepo {
) -> Result<(), StoreError>; ) -> Result<(), StoreError>;
async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError>; async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError>;
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError>; async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl<T> IdentifierRepo for Arc<T> impl<T> DetailsRepo for Arc<T>
where where
T: IdentifierRepo, T: DetailsRepo,
{ {
async fn relate_details( async fn relate_details(
&self, &self,
@ -405,8 +453,8 @@ where
T::details(self, identifier).await T::details(self, identifier).await
} }
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> {
T::cleanup(self, identifier).await T::cleanup_details(self, identifier).await
} }
} }
@ -457,7 +505,7 @@ pub(crate) trait HashRepo: BaseRepo {
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>>; async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>>;
async fn create( async fn create_hash(
&self, &self,
hash: Hash, hash: Hash,
identifier: &dyn Identifier, identifier: &dyn Identifier,
@ -492,7 +540,7 @@ pub(crate) trait HashRepo: BaseRepo {
) -> Result<(), StoreError>; ) -> Result<(), StoreError>;
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError>; async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>; async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -508,12 +556,12 @@ where
T::hashes(self).await T::hashes(self).await
} }
async fn create( async fn create_hash(
&self, &self,
hash: Hash, hash: Hash,
identifier: &dyn Identifier, identifier: &dyn Identifier,
) -> Result<Result<(), HashAlreadyExists>, StoreError> { ) -> Result<Result<(), HashAlreadyExists>, StoreError> {
T::create(self, hash, identifier).await T::create_hash(self, hash, identifier).await
} }
async fn update_identifier( async fn update_identifier(
@ -565,14 +613,14 @@ where
T::motion_identifier(self, hash).await T::motion_identifier(self, hash).await
} }
async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
T::cleanup(self, hash).await T::cleanup_hash(self, hash).await
} }
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo: BaseRepo { pub(crate) trait AliasRepo: BaseRepo {
async fn create( async fn create_alias(
&self, &self,
alias: &Alias, alias: &Alias,
delete_token: &DeleteToken, delete_token: &DeleteToken,
@ -585,7 +633,7 @@ pub(crate) trait AliasRepo: BaseRepo {
async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError>; async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError>;
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -593,13 +641,13 @@ impl<T> AliasRepo for Arc<T>
where where
T: AliasRepo, T: AliasRepo,
{ {
async fn create( async fn create_alias(
&self, &self,
alias: &Alias, alias: &Alias,
delete_token: &DeleteToken, delete_token: &DeleteToken,
hash: Hash, hash: Hash,
) -> Result<Result<(), AliasAlreadyExists>, RepoError> { ) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
T::create(self, alias, delete_token, hash).await T::create_alias(self, alias, delete_token, hash).await
} }
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> { async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
@ -614,8 +662,8 @@ where
T::for_hash(self, hash).await T::for_hash(self, hash).await
} }
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> {
T::cleanup(self, alias).await T::cleanup_alias(self, alias).await
} }
} }

View file

@ -4,7 +4,7 @@ use crate::{
config::Configuration, config::Configuration,
details::Details, details::Details,
error::Error, error::Error,
repo::{AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, VariantAccessRepo}, repo::{ArcRepo, DeleteToken, Hash},
repo_04::{ repo_04::{
AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _, AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _,
SledRepo as OldSledRepo, SledRepo as OldSledRepo,
@ -16,6 +16,53 @@ use crate::{
const MIGRATE_CONCURRENCY: usize = 32; const MIGRATE_CONCURRENCY: usize = 32;
const GENERATOR_KEY: &str = "last-path"; const GENERATOR_KEY: &str = "last-path";
#[tracing::instrument(skip_all)]
pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result<(), Error> {
tracing::warn!("Running checks");
if let Err(e) = old_repo.health_check().await {
tracing::warn!("Old repo is not configured correctly");
return Err(e.into());
}
if let Err(e) = new_repo.health_check().await {
tracing::warn!("New repo is not configured correctly");
return Err(e.into());
}
let total_size = old_repo.size().await?;
let pct = (total_size / 100).max(1);
tracing::warn!("Checks complete, migrating repo");
tracing::warn!("{total_size} hashes will be migrated");
let mut hash_stream = old_repo.hashes().await.into_streamer();
let mut index = 0;
while let Some(res) = hash_stream.next().await {
if let Ok(hash) = res {
let _ = migrate_hash(old_repo.clone(), new_repo.clone(), hash).await;
} else {
tracing::warn!("Failed to read hash, skipping");
}
index += 1;
if index % pct == 0 {
let percent = index / pct;
tracing::warn!("Migration {percent}% complete - {index}/{total_size}");
}
}
if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? {
new_repo
.set(GENERATOR_KEY, generator_state.to_vec().into())
.await?;
}
tracing::warn!("Migration complete");
Ok(())
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn migrate_04<S: Store + 'static>( pub(crate) async fn migrate_04<S: Store + 'static>(
old_repo: OldSledRepo, old_repo: OldSledRepo,
@ -94,13 +141,32 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
Ok(()) Ok(())
} }
async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
let mut hash_failures = 0;
while let Err(e) = do_migrate_hash(&old_repo, &new_repo, hash.clone()).await {
hash_failures += 1;
if hash_failures > 10 {
tracing::error!(
"Failed to migrate hash {}, skipping\n{hash:?}",
format!("{e:?}")
);
break;
} else {
tracing::warn!("Failed to migrate hash {hash:?}, retrying +{hash_failures}",);
}
}
}
async fn migrate_hash_04<S: Store>( async fn migrate_hash_04<S: Store>(
old_repo: OldSledRepo, old_repo: OldSledRepo,
new_repo: ArcRepo, new_repo: ArcRepo,
store: S, store: S,
config: Configuration, config: Configuration,
old_hash: sled::IVec, old_hash: sled::IVec,
) -> Result<(), Error> { ) {
let mut hash_failures = 0; let mut hash_failures = 0;
while let Err(e) = while let Err(e) =
@ -114,7 +180,8 @@ async fn migrate_hash_04<S: Store>(
hex::encode(&old_hash[..]), hex::encode(&old_hash[..]),
format!("{e:?}") format!("{e:?}")
); );
return Err(e);
break;
} else { } else {
tracing::warn!( tracing::warn!(
"Failed to migrate hash {}, retrying +{hash_failures}", "Failed to migrate hash {}, retrying +{hash_failures}",
@ -122,6 +189,65 @@ async fn migrate_hash_04<S: Store>(
); );
} }
} }
}
#[tracing::instrument(skip_all)]
async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let Some(identifier) = old_repo.identifier(hash.clone()).await? else {
tracing::warn!("Skipping hash {hash:?}, no identifier");
return Ok(());
};
let _ = new_repo.create_hash(hash.clone(), &identifier).await?;
if let Some(details) = old_repo.details(&identifier).await? {
new_repo.relate_details(&identifier, &details).await?;
}
if let Some(identifier) = old_repo.motion_identifier(hash.clone()).await? {
new_repo
.relate_motion_identifier(hash.clone(), &identifier)
.await?;
if let Some(details) = old_repo.details(&identifier).await? {
new_repo.relate_details(&identifier, &details).await?;
}
}
for alias in old_repo.for_hash(hash.clone()).await? {
let delete_token = old_repo
.delete_token(&alias)
.await?
.unwrap_or_else(DeleteToken::generate);
let _ = new_repo
.create_alias(&alias, &delete_token, hash.clone())
.await?;
if let Some(timestamp) = old_repo.alias_accessed_at(alias.clone()).await? {
new_repo.set_accessed_alias(alias, timestamp).await?;
}
}
for (variant, identifier) in old_repo.variants(hash.clone()).await? {
let _ = new_repo
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
.await?;
if let Some(timestamp) = new_repo
.variant_accessed_at(hash.clone(), variant.clone())
.await?
{
new_repo
.set_accessed_variant(hash.clone(), variant, timestamp)
.await?;
} else {
new_repo.accessed_variant(hash.clone(), variant).await?;
}
if let Some(details) = old_repo.details(&identifier).await? {
new_repo.relate_details(&identifier, &details).await?;
}
}
Ok(()) Ok(())
} }
@ -153,7 +279,7 @@ async fn do_migrate_hash_04<S: Store>(
let hash = Hash::new(hash, size, hash_details.internal_format()); let hash = Hash::new(hash, size, hash_details.internal_format());
let _ = HashRepo::create(new_repo.as_ref(), hash.clone(), &identifier).await?; let _ = new_repo.create_hash(hash.clone(), &identifier).await?;
for alias in aliases { for alias in aliases {
let delete_token = old_repo let delete_token = old_repo
@ -161,7 +287,9 @@ async fn do_migrate_hash_04<S: Store>(
.await? .await?
.unwrap_or_else(DeleteToken::generate); .unwrap_or_else(DeleteToken::generate);
let _ = AliasRepo::create(new_repo.as_ref(), &alias, &delete_token, hash.clone()).await?; let _ = new_repo
.create_alias(&alias, &delete_token, hash.clone())
.await?;
} }
if let Some(identifier) = motion_identifier { if let Some(identifier) = motion_identifier {
@ -179,7 +307,7 @@ async fn do_migrate_hash_04<S: Store>(
set_details(old_repo, new_repo, store, config, &identifier).await?; set_details(old_repo, new_repo, store, config, &identifier).await?;
VariantAccessRepo::accessed(new_repo.as_ref(), hash.clone(), variant).await?; new_repo.accessed_variant(hash.clone(), variant).await?;
} }
Ok(()) Ok(())

View file

@ -2,9 +2,9 @@ use crate::{
details::MaybeHumanDate, details::MaybeHumanDate,
repo::{ repo::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, Details, DetailsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, JobId, ProxyRepo,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantAccessRepo, VariantAlreadyExists,
}, },
serde_str::Serde, serde_str::Serde,
store::StoreError, store::StoreError,
@ -218,11 +218,12 @@ impl ProxyRepo for SledRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl AliasAccessRepo for SledRepo { impl AliasAccessRepo for SledRepo {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { async fn set_accessed_alias(
let mut value_bytes = time::OffsetDateTime::now_utc() &self,
.unix_timestamp_nanos() alias: Alias,
.to_be_bytes() accessed: time::OffsetDateTime,
.to_vec(); ) -> Result<(), RepoError> {
let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec();
value_bytes.extend_from_slice(&alias.to_bytes()); value_bytes.extend_from_slice(&alias.to_bytes());
let value_bytes = IVec::from(value_bytes); let value_bytes = IVec::from(value_bytes);
@ -251,6 +252,25 @@ impl AliasAccessRepo for SledRepo {
Ok(()) Ok(())
} }
async fn alias_accessed_at(
&self,
alias: Alias,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
let alias = alias.to_bytes();
let Some(timestamp) = b!(self.variant_access, variant_access.get(alias)) else {
return Ok(None);
};
let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes");
let timestamp =
time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp))
.expect("valid timestamp");
Ok(Some(timestamp))
}
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn older_aliases( async fn older_aliases(
&self, &self,
@ -272,7 +292,7 @@ impl AliasAccessRepo for SledRepo {
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError> {
let alias_access = self.alias_access.clone(); let alias_access = self.alias_access.clone();
let inverse_alias_access = self.inverse_alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone();
@ -300,14 +320,16 @@ impl AliasAccessRepo for SledRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl VariantAccessRepo for SledRepo { impl VariantAccessRepo for SledRepo {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn set_accessed_variant(
&self,
hash: Hash,
variant: String,
accessed: time::OffsetDateTime,
) -> Result<(), RepoError> {
let hash = hash.to_bytes(); let hash = hash.to_bytes();
let key = IVec::from(variant_access_key(&hash, &variant)); let key = IVec::from(variant_access_key(&hash, &variant));
let mut value_bytes = time::OffsetDateTime::now_utc() let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec();
.unix_timestamp_nanos()
.to_be_bytes()
.to_vec();
value_bytes.extend_from_slice(&key); value_bytes.extend_from_slice(&key);
let value_bytes = IVec::from(value_bytes); let value_bytes = IVec::from(value_bytes);
@ -337,13 +359,25 @@ impl VariantAccessRepo for SledRepo {
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn contains_variant(&self, hash: Hash, variant: String) -> Result<bool, RepoError> { async fn variant_accessed_at(
&self,
hash: Hash,
variant: String,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
let hash = hash.to_bytes(); let hash = hash.to_bytes();
let key = variant_access_key(&hash, &variant); let key = variant_access_key(&hash, &variant);
let timestamp = b!(self.variant_access, variant_access.get(key)); let Some(timestamp) = b!(self.variant_access, variant_access.get(key)) else {
return Ok(None);
};
Ok(timestamp.is_some()) let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes");
let timestamp =
time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp))
.expect("valid timestamp");
Ok(Some(timestamp))
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
@ -365,7 +399,7 @@ impl VariantAccessRepo for SledRepo {
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes(); let hash = hash.to_bytes();
let key = IVec::from(variant_access_key(&hash, &variant)); let key = IVec::from(variant_access_key(&hash, &variant));
@ -480,7 +514,7 @@ impl Drop for PopMetricsGuard {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo { impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> {
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
Ok(()) Ok(())
} }
@ -884,7 +918,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo { impl DetailsRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_details( async fn relate_details(
&self, &self,
@ -918,7 +952,7 @@ impl IdentifierRepo for SledRepo {
} }
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> {
let key = identifier.to_bytes()?; let key = identifier.to_bytes()?;
b!(self.identifier_details, identifier_details.remove(key)); b!(self.identifier_details, identifier_details.remove(key));
@ -984,7 +1018,7 @@ impl HashRepo for SledRepo {
} }
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn create( async fn create_hash(
&self, &self,
hash: Hash, hash: Hash,
identifier: &dyn Identifier, identifier: &dyn Identifier,
@ -1160,7 +1194,7 @@ impl HashRepo for SledRepo {
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
let hash = hash.to_ivec(); let hash = hash.to_ivec();
let hashes = self.hashes.clone(); let hashes = self.hashes.clone();
@ -1226,7 +1260,7 @@ fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec<u8> {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo { impl AliasRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn create( async fn create_alias(
&self, &self,
alias: &Alias, alias: &Alias,
delete_token: &DeleteToken, delete_token: &DeleteToken,
@ -1310,7 +1344,7 @@ impl AliasRepo for SledRepo {
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> {
let alias: IVec = alias.to_bytes().into(); let alias: IVec = alias.to_bytes().into();
let aliases = self.aliases.clone(); let aliases = self.aliases.clone();