From bd3975f455ca81b2e21a06cddedf62cbc14a7f50 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 25 Jul 2023 20:08:18 -0500 Subject: [PATCH] Simplify some repo functions, remove 0.3 migration --- src/ingest.rs | 66 +++---- src/lib.rs | 12 +- src/migrate_store.rs | 2 +- src/queue/cleanup.rs | 6 +- src/queue/process.rs | 10 +- src/repo.rs | 299 ++++---------------------------- src/repo/old.rs | 189 -------------------- src/repo/old/migrate.rs | 98 ----------- src/repo/old/migrate/s034.rs | 78 --------- src/repo/sled.rs | 293 ++++++++++++++++--------------- src/store/file_store/file_id.rs | 10 -- 11 files changed, 220 insertions(+), 843 deletions(-) delete mode 100644 src/repo/old.rs delete mode 100644 src/repo/old/migrate.rs delete mode 100644 src/repo/old/migrate/s034.rs diff --git a/src/ingest.rs b/src/ingest.rs index 7c77ef4..3c77ffb 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,7 +3,7 @@ use crate::{ either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, - repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, FullRepo, HashRepo}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, store::Store, }; use actix_web::web::Bytes; @@ -21,6 +21,7 @@ where S: Store, { repo: R, + delete_token: DeleteToken, hash: Option>, alias: Option, identifier: Option, @@ -105,6 +106,7 @@ where let mut session = Session { repo: repo.clone(), + delete_token: DeleteToken::generate(), hash: None, alias: None, identifier: Some(identifier.clone()), @@ -117,8 +119,8 @@ where if let Some(alias) = declared_alias { session.add_existing_alias(&hash, alias).await? } else { - session.create_alias(&hash, input_type).await?; - } + session.create_alias(&hash, input_type).await? + }; Ok(session) } @@ -135,7 +137,10 @@ where S: Store, R: FullRepo, { - if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() { + if HashRepo::create(repo, hash.to_vec().into(), identifier) + .await? + .is_err() + { // duplicate upload store.remove(identifier).await?; session.identifier.take(); @@ -145,9 +150,6 @@ where // Set hash after upload uniquness check so we don't clean existing files on failure session.hash = Some(Vec::from(hash)); - repo.relate_identifier(hash.to_vec().into(), identifier) - .await?; - Ok(()) } @@ -156,60 +158,48 @@ where R: FullRepo + 'static, S: Store, { - pub(crate) fn disarm(&mut self) { + pub(crate) fn disarm(mut self) -> DeleteToken { let _ = self.hash.take(); let _ = self.alias.take(); let _ = self.identifier.take(); + + self.delete_token.clone() } pub(crate) fn alias(&self) -> Option<&Alias> { self.alias.as_ref() } - #[tracing::instrument(level = "trace", skip_all)] - pub(crate) async fn delete_token(&self) -> Result { - let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; - - tracing::trace!("Generating delete token"); - let delete_token = DeleteToken::generate(); - - tracing::trace!("Saving delete token"); - let res = self.repo.relate_delete_token(&alias, &delete_token).await?; - - if let Err(AlreadyExists(delete_token)) = res { - tracing::trace!("Returning existing delete token, {:?}", delete_token); - return Ok(delete_token); - } - - tracing::trace!("Returning new delete token, {:?}", delete_token); - Ok(delete_token) + pub(crate) fn delete_token(&self) -> &DeleteToken { + &self.delete_token } #[tracing::instrument(skip(self, hash))] async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { - AliasRepo::create(&self.repo, &alias) + let hash: R::Bytes = hash.to_vec().into(); + + AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone()) .await? .map_err(|_| UploadError::DuplicateAlias)?; self.alias = Some(alias.clone()); - self.repo.relate_hash(&alias, hash.to_vec().into()).await?; - self.repo.relate_alias(hash.to_vec().into(), &alias).await?; - Ok(()) } #[tracing::instrument(level = "debug", skip(self, hash))] async fn create_alias(&mut self, hash: &[u8], input_type: InternalFormat) -> Result<(), Error> { + let hash: R::Bytes = hash.to_vec().into(); + loop { let alias = Alias::generate(input_type.file_extension().to_string()); - if AliasRepo::create(&self.repo, &alias).await?.is_ok() { + if AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone()) + .await? + .is_ok() + { self.alias = Some(alias.clone()); - self.repo.relate_hash(&alias, hash.to_vec().into()).await?; - self.repo.relate_alias(hash.to_vec().into(), &alias).await?; - return Ok(()); } @@ -249,20 +239,14 @@ where if let Some(alias) = self.alias.take() { let repo = self.repo.clone(); + let token = self.delete_token.clone(); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - if let Ok(Some(token)) = repo.delete_token(&alias).await { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } else { - let token = DeleteToken::generate(); - if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } - } + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; } .instrument(cleanup_span), ) diff --git a/src/lib.rs b/src/lib.rs index 20eefa8..1be5a20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -303,7 +303,7 @@ async fn handle_upload( for image in &images { if let Some(alias) = image.result.alias() { tracing::debug!("Uploaded {} as {:?}", image.filename, alias); - let delete_token = image.result.delete_token().await?; + let delete_token = image.result.delete_token(); let details = ensure_details(&repo, &store, &config, alias).await?; @@ -315,7 +315,7 @@ async fn handle_upload( } } - for mut image in images { + for image in images { image.result.disarm(); } @@ -489,14 +489,13 @@ async fn ingest_inline( store: &S, config: &Configuration, ) -> Result<(Alias, DeleteToken, Details), Error> { - let mut session = ingest::ingest(repo, store, stream, None, &config.media).await?; + let session = ingest::ingest(repo, store, stream, None, &config.media).await?; let alias = session.alias().expect("alias should exist").to_owned(); - let delete_token = session.delete_token().await?; let details = ensure_details(repo, store, config, &alias).await?; - session.disarm(); + let delete_token = session.disarm(); Ok((alias, delete_token, details)) } @@ -1886,7 +1885,6 @@ impl PictRsConfiguration { let PictRsConfiguration { config, operation } = self; let repo = Repo::open(config.repo.clone())?; - repo.migrate_from_db(config.old_db.path.clone()).await?; let client = build_client(&config)?; match operation { @@ -1949,8 +1947,6 @@ impl PictRsConfiguration { match config.store.clone() { config::Store::Filesystem(config::Filesystem { path }) => { - repo.migrate_identifiers().await?; - let store = FileStore::build(path, repo.clone()).await?; match repo { Repo::Sled(sled_repo) => { diff --git a/src/migrate_store.rs b/src/migrate_store.rs index df9e8fe..922b8a8 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -283,7 +283,7 @@ where match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { Ok(new_identifier) => { migrate_details(repo, &original_identifier, &new_identifier).await?; - repo.relate_identifier(hash.clone().into(), &new_identifier) + repo.update_identifier(hash.clone().into(), &new_identifier) .await?; repo.mark_migrated(&original_identifier, &new_identifier) .await?; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 4c33344..3bb051c 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -96,7 +96,7 @@ where { let hash: R::Bytes = hash.into(); - let aliases = repo.aliases(hash.clone()).await?; + let aliases = repo.for_hash(hash.clone()).await?; if !aliases.is_empty() { for alias in aliases { @@ -151,9 +151,7 @@ where return Ok(()); }; - repo.remove_alias(hash.clone(), &alias).await?; - - if repo.aliases(hash.clone()).await?.is_empty() { + if repo.for_hash(hash.clone()).await?.is_empty() { super::cleanup_hash(repo, hash).await?; } diff --git a/src/queue/process.rs b/src/queue/process.rs index 10e76ea..38ac21c 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -5,7 +5,7 @@ use crate::{ formats::InputProcessableFormat, ingest::Session, queue::{Base64Bytes, LocalBoxFuture, Process}, - repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult}, + repo::{Alias, FullRepo, UploadId, UploadResult}, serde_str::Serde, store::{Identifier, Store}, }; @@ -99,9 +99,7 @@ where let session = crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; - let token = session.delete_token().await?; - - Ok((session, token)) as Result<(Session, DeleteToken), Error> + Ok(session) as Result, Error> }) .await; @@ -111,10 +109,10 @@ where }; let result = match fut.await { - Ok((mut session, token)) => { + Ok(session) => { let alias = session.alias().take().expect("Alias should exist").clone(); + let token = session.disarm(); let result = UploadResult::Success { alias, token }; - session.disarm(); result } Err(e) => { diff --git a/src/repo.rs b/src/repo.rs index 1291a9b..a53fbcc 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,16 +1,14 @@ use crate::{ config, details::Details, - store::{file_store::FileId, Identifier, StoreError}, + store::{Identifier, StoreError}, }; use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::Stream; -use std::{fmt::Debug, path::PathBuf}; -use tracing::Instrument; +use std::fmt::Debug; use url::Url; use uuid::Uuid; -mod old; pub(crate) mod sled; #[derive(Clone, Debug)] @@ -30,7 +28,7 @@ pub(crate) struct Alias { extension: Option, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct DeleteToken { id: MaybeUuid, } @@ -38,8 +36,6 @@ pub(crate) struct DeleteToken { pub(crate) struct HashAlreadyExists; pub(crate) struct AliasAlreadyExists; -pub(crate) struct AlreadyExists(pub(super) DeleteToken); - #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { id: Uuid, @@ -60,9 +56,6 @@ pub(crate) enum RepoError { #[error("Panic in blocking operation")] Canceled, - - #[error("Required field is missing {0}")] - Missing(&'static str), } #[async_trait::async_trait(?Send)] @@ -102,7 +95,7 @@ pub(crate) trait FullRepo: return Ok(vec![]); }; - self.aliases(hash).await + self.for_hash(hash).await } #[tracing::instrument(skip(self))] @@ -427,17 +420,18 @@ pub(crate) trait HashRepo: BaseRepo { async fn hashes(&self) -> Self::Stream; - async fn create(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError>; - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; - async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError>; - - async fn relate_identifier( + async fn update_identifier( &self, hash: Self::Bytes, identifier: &I, ) -> Result<(), StoreError>; + async fn identifier( &self, hash: Self::Bytes, @@ -488,28 +482,20 @@ where T::hashes(self).await } - async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { - T::create(self, hash).await + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError> { + T::create(self, hash, identifier).await } - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { - T::relate_alias(self, hash, alias).await - } - - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { - T::remove_alias(self, hash, alias).await - } - - async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError> { - T::aliases(self, hash).await - } - - async fn relate_identifier( + async fn update_identifier( &self, hash: Self::Bytes, identifier: &I, ) -> Result<(), StoreError> { - T::relate_identifier(self, hash, identifier).await + T::update_identifier(self, hash, identifier).await } async fn identifier( @@ -569,18 +555,19 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo: BaseRepo { - async fn create(&self, alias: &Alias) -> Result, RepoError>; - - async fn relate_delete_token( + async fn create( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, RepoError>; + hash: Self::Bytes, + ) -> Result, RepoError>; + async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>; async fn hash(&self, alias: &Alias) -> Result, RepoError>; + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; } @@ -589,30 +576,27 @@ impl AliasRepo for actix_web::web::Data where T: AliasRepo, { - async fn create(&self, alias: &Alias) -> Result, RepoError> { - T::create(self, alias).await - } - - async fn relate_delete_token( + async fn create( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, RepoError> { - T::relate_delete_token(self, alias, delete_token).await + hash: Self::Bytes, + ) -> Result, RepoError> { + T::create(self, alias, delete_token, hash).await } async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { T::delete_token(self, alias).await } - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> { - T::relate_hash(self, alias, hash).await - } - async fn hash(&self, alias: &Alias) -> Result, RepoError> { T::hash(self, alias).await } + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + T::for_hash(self, hash).await + } + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { T::cleanup(self, alias).await } @@ -633,223 +617,6 @@ impl Repo { } } } - - pub(crate) async fn migrate_from_db(&self, path: PathBuf) -> color_eyre::Result<()> { - if self.has_migrated().await? { - return Ok(()); - } - - if let Some(old) = self::old::Old::open(path)? { - let span = tracing::warn_span!("Migrating Database from 0.3 layout to 0.4 layout"); - - match self { - Self::Sled(repo) => { - async { - for hash in old.hashes() { - if let Err(e) = migrate_hash(repo, &old, hash).await { - tracing::error!("Failed to migrate hash: {}", format!("{e:?}")); - } - } - - if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) { - tracing::warn!("Setting STORE_MIGRATION_PROGRESS"); - let _ = repo.set(STORE_MIGRATION_PROGRESS, value).await; - } - - if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) { - tracing::warn!("Setting GENERATOR_KEY"); - let _ = repo.set(GENERATOR_KEY, value).await; - } - } - .instrument(span) - .await; - } - } - } - - self.mark_migrated().await?; - - Ok(()) - } - - pub(crate) async fn migrate_identifiers(&self) -> color_eyre::Result<()> { - if self.has_migrated_identifiers().await? { - return Ok(()); - } - - let span = tracing::warn_span!("Migrating File Identifiers from 0.3 format to 0.4 format"); - - match self { - Self::Sled(repo) => { - async { - use futures_util::StreamExt; - let mut hashes = repo.hashes().await; - - while let Some(res) = hashes.next().await { - let hash = res?; - if let Err(e) = migrate_identifiers_for_hash(repo, hash).await { - tracing::error!( - "Failed to migrate identifiers for hash: {}", - format!("{e:?}") - ); - } - } - - Ok(()) as color_eyre::Result<()> - } - .instrument(span) - .await?; - } - } - - self.mark_migrated_identifiers().await?; - - Ok(()) - } - - async fn has_migrated(&self) -> color_eyre::Result { - match self { - Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_O1).await?.is_some()), - } - } - - async fn has_migrated_identifiers(&self) -> color_eyre::Result { - match self { - Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_02).await?.is_some()), - } - } - - async fn mark_migrated(&self) -> color_eyre::Result<()> { - match self { - Self::Sled(repo) => { - repo.set(REPO_MIGRATION_O1, b"1".to_vec().into()).await?; - } - } - - Ok(()) - } - - async fn mark_migrated_identifiers(&self) -> color_eyre::Result<()> { - match self { - Self::Sled(repo) => { - repo.set(REPO_MIGRATION_02, b"1".to_vec().into()).await?; - } - } - - Ok(()) - } -} - -const REPO_MIGRATION_O1: &str = "repo-migration-01"; -const REPO_MIGRATION_02: &str = "repo-migration-02"; -const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; -const GENERATOR_KEY: &str = "last-path"; - -#[tracing::instrument(skip(repo, hash), fields(hash = hex::encode(&hash)))] -async fn migrate_identifiers_for_hash(repo: &T, hash: ::sled::IVec) -> color_eyre::Result<()> -where - T: FullRepo, -{ - let hash: T::Bytes = hash.to_vec().into(); - - if let Some(motion_identifier) = repo.motion_identifier::(hash.clone()).await? { - if let Some(new_motion_identifier) = motion_identifier.normalize_for_migration() { - migrate_identifier_details(repo, &motion_identifier, &new_motion_identifier).await?; - repo.relate_motion_identifier(hash.clone(), &new_motion_identifier) - .await?; - } - } - - for (variant_path, variant_identifier) in repo.variants::(hash.clone()).await? { - if let Some(new_variant_identifier) = variant_identifier.normalize_for_migration() { - migrate_identifier_details(repo, &variant_identifier, &new_variant_identifier).await?; - repo.relate_variant_identifier(hash.clone(), variant_path, &new_variant_identifier) - .await?; - } - } - - let Some(main_identifier) = repo.identifier::(hash.clone()).await? else { - tracing::warn!("Missing identifier for hash {}, queueing cleanup", hex::encode(&hash)); - crate::queue::cleanup_hash(repo, hash.clone()).await?; - return Err(RepoError::Missing("hash -> identifier").into()); - }; - - if let Some(new_main_identifier) = main_identifier.normalize_for_migration() { - migrate_identifier_details(repo, &main_identifier, &new_main_identifier).await?; - repo.relate_identifier(hash, &new_main_identifier).await?; - } - - Ok(()) -} - -#[tracing::instrument(level = "debug", skip(repo))] -async fn migrate_identifier_details( - repo: &T, - old: &FileId, - new: &FileId, -) -> color_eyre::Result<()> -where - T: FullRepo, -{ - if old == new { - tracing::warn!("Old FileId and new FileId are identical"); - return Ok(()); - } - - if let Some(details) = repo.details(old).await? { - repo.relate_details(new, &details).await?; - IdentifierRepo::cleanup(repo, old).await?; - } - - Ok(()) -} - -#[tracing::instrument(skip(repo, old, hash), fields(hash = hex::encode(&hash)))] -async fn migrate_hash(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()> -where - T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo + Debug, -{ - let new_hash: T::Bytes = hash.to_vec().into(); - let main_ident = old.main_identifier(&hash)?.to_vec(); - - if HashRepo::create(repo, new_hash.clone()).await?.is_err() { - tracing::warn!("Duplicate hash detected"); - return Ok(()); - } - - repo.relate_identifier(new_hash.clone(), &main_ident) - .await?; - - for alias in old.aliases(&hash) { - if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await { - let _ = repo.relate_alias(new_hash.clone(), &alias).await; - let _ = repo.relate_hash(&alias, new_hash.clone()).await; - - if let Ok(Some(delete_token)) = old.delete_token(&alias) { - let _ = repo.relate_delete_token(&alias, &delete_token).await; - } - } - } - - if let Ok(Some(identifier)) = old.motion_identifier(&hash) { - let _ = repo - .relate_motion_identifier(new_hash.clone(), &identifier.to_vec()) - .await; - } - - for (variant_path, identifier) in old.variants(&hash)? { - let variant = variant_path.to_string_lossy().to_string(); - - let _ = repo - .relate_variant_identifier(new_hash.clone(), variant, &identifier.to_vec()) - .await; - } - - for (identifier, details) in old.details(&hash)? { - let _ = repo.relate_details(&identifier.to_vec(), &details).await; - } - - Ok(()) } impl MaybeUuid { diff --git a/src/repo/old.rs b/src/repo/old.rs deleted file mode 100644 index a87c6fa..0000000 --- a/src/repo/old.rs +++ /dev/null @@ -1,189 +0,0 @@ -// TREE STRUCTURE -// - Alias Tree -// - alias -> hash -// - alias / id -> u64(id) -// - alias / delete -> delete token -// - Main Tree -// - hash -> filename -// - hash 0 u64(id) -> alias -// - Filename Tree -// - filename -> hash -// - Details Tree -// - filename / S::Identifier -> details -// - Identifier Tree -// - filename -> S::Identifier -// - filename / variant path -> S::Identifier -// - filename / motion -> S::Identifier -// - Settings Tree -// - store-migration-progress -> Path Tree Key - -use super::{Alias, DeleteToken, Details}; -use std::path::PathBuf; - -mod migrate; - -#[derive(Debug)] -struct OldDbError(&'static str); - -impl std::fmt::Display for OldDbError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for OldDbError {} - -pub(super) struct Old { - alias_tree: ::sled::Tree, - filename_tree: ::sled::Tree, - main_tree: ::sled::Tree, - details_tree: ::sled::Tree, - settings_tree: ::sled::Tree, - identifier_tree: ::sled::Tree, - _db: ::sled::Db, -} - -impl Old { - #[tracing::instrument] - pub(super) fn open(path: PathBuf) -> color_eyre::Result> { - if let Some(db) = migrate::LatestDb::exists(path).migrate()? { - Ok(Some(Self { - alias_tree: db.open_tree("alias")?, - filename_tree: db.open_tree("filename")?, - main_tree: db.open_tree("main")?, - details_tree: db.open_tree("details")?, - settings_tree: db.open_tree("settings")?, - identifier_tree: db.open_tree("path")?, - _db: db, - })) - } else { - Ok(None) - } - } - - pub(super) fn setting(&self, key: &[u8]) -> color_eyre::Result> { - Ok(self.settings_tree.get(key)?) - } - - pub(super) fn hashes(&self) -> impl std::iter::Iterator { - let length = self.filename_tree.len(); - tracing::info!("FILENAME_TREE_LEN: {}", length); - self.filename_tree - .iter() - .values() - .filter_map(|res| res.ok()) - } - - pub(super) fn details( - &self, - hash: &sled::IVec, - ) -> color_eyre::Result> { - let filename = self - .main_tree - .get(hash)? - .ok_or(OldDbError("Missing filename"))?; - - let filename = String::from_utf8_lossy(&filename); - - Ok(self - .identifier_tree - .scan_prefix(filename.as_bytes()) - .values() - .filter_map(Result::ok) - .filter_map(|identifier| { - let mut key = filename.as_bytes().to_vec(); - key.push(b'/'); - key.extend_from_slice(&identifier); - - let details = self.details_tree.get(key).ok()??; - let details = serde_json::from_slice(&details).ok()?; - - Some((identifier, details)) - }) - .collect()) - } - - pub(super) fn main_identifier(&self, hash: &sled::IVec) -> color_eyre::Result { - let filename = self - .main_tree - .get(hash)? - .ok_or(OldDbError("Missing filename"))?; - - Ok(self - .identifier_tree - .get(filename)? - .ok_or(OldDbError("Missing identifier"))?) - } - - pub(super) fn variants( - &self, - hash: &sled::IVec, - ) -> color_eyre::Result> { - let filename = self - .main_tree - .get(hash)? - .ok_or(OldDbError("Missing filename"))?; - - let filename_string = String::from_utf8_lossy(&filename); - - let variant_prefix = format!("{filename_string}/"); - - Ok(self - .identifier_tree - .scan_prefix(&variant_prefix) - .filter_map(|res| res.ok()) - .filter_map(|(key, value)| { - let variant_path_bytes = &key[variant_prefix.as_bytes().len()..]; - if variant_path_bytes == b"motion" { - return None; - } - - let path = String::from_utf8(variant_path_bytes.to_vec()).ok()?; - let mut path = PathBuf::from(path); - let extension = path.extension()?.to_str()?.to_string(); - path.pop(); - path.push(extension); - - Some((path, value)) - }) - .collect()) - } - - pub(super) fn motion_identifier( - &self, - hash: &sled::IVec, - ) -> color_eyre::Result> { - let filename = self - .main_tree - .get(hash)? - .ok_or(OldDbError("Missing filename"))?; - - let filename_string = String::from_utf8_lossy(&filename); - - let motion_key = format!("{filename_string}/motion"); - - Ok(self.filename_tree.get(motion_key)?) - } - - pub(super) fn aliases(&self, hash: &sled::IVec) -> Vec { - let mut key = hash.to_vec(); - key.push(0); - - self.main_tree - .scan_prefix(key) - .values() - .filter_map(|res| res.ok()) - .filter_map(|alias| Alias::from_slice(&alias)) - .collect() - } - - pub(super) fn delete_token(&self, alias: &Alias) -> color_eyre::Result> { - let key = format!("{alias}/delete"); - - if let Some(ivec) = self.alias_tree.get(key)? { - return Ok(DeleteToken::from_slice(&ivec)); - } - - Ok(None) - } -} diff --git a/src/repo/old/migrate.rs b/src/repo/old/migrate.rs deleted file mode 100644 index 82f00f4..0000000 --- a/src/repo/old/migrate.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::Error; -use std::path::PathBuf; - -mod s034; - -type SledIter = Box, Vec), Error>>>; - -trait SledDb { - type SledTree: SledTree; - - fn open_tree(&self, name: &str) -> Result; - - fn self_tree(&self) -> &Self::SledTree; -} - -impl SledDb for &T -where - T: SledDb, -{ - type SledTree = T::SledTree; - - fn open_tree(&self, name: &str) -> Result { - (*self).open_tree(name) - } - - fn self_tree(&self) -> &Self::SledTree { - (*self).self_tree() - } -} - -trait SledTree { - fn get(&self, key: K) -> Result>, Error> - where - K: AsRef<[u8]>; - - fn insert(&self, key: K, value: V) -> Result<(), Error> - where - K: AsRef<[u8]>, - V: AsRef<[u8]>; - - fn iter(&self) -> SledIter; - - fn range(&self, range: R) -> SledIter - where - K: AsRef<[u8]>, - R: std::ops::RangeBounds; - - fn flush(&self) -> Result<(), Error>; -} - -pub(crate) struct LatestDb { - root_dir: PathBuf, - version: DbVersion, -} - -impl LatestDb { - pub(crate) fn exists(root_dir: PathBuf) -> Self { - let version = DbVersion::exists(root_dir.clone()); - - LatestDb { root_dir, version } - } - - pub(crate) fn migrate(self) -> Result, Error> { - let LatestDb { root_dir, version } = self; - - loop { - let root_dir2 = root_dir.clone(); - let res = std::panic::catch_unwind(move || version.migrate(root_dir2)); - - if let Ok(res) = res { - return res; - } - } - } -} - -#[derive(Clone, Copy)] -enum DbVersion { - Sled034, - Fresh, -} - -impl DbVersion { - fn exists(root: PathBuf) -> Self { - if s034::exists(root) { - return DbVersion::Sled034; - } - - DbVersion::Fresh - } - - fn migrate(self, root: PathBuf) -> Result, Error> { - match self { - DbVersion::Sled034 => Some(s034::open(root)).transpose(), - DbVersion::Fresh => Ok(None), - } - } -} diff --git a/src/repo/old/migrate/s034.rs b/src/repo/old/migrate/s034.rs deleted file mode 100644 index 60a1176..0000000 --- a/src/repo/old/migrate/s034.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{ - error::Error, - repo::old::migrate::{SledDb, SledIter, SledTree}, -}; -use sled as sled034; -use std::path::PathBuf; - -const SLED_034: &str = "db-0.34"; - -pub(crate) fn exists(mut base: PathBuf) -> bool { - base.push("sled"); - base.push(SLED_034); - - std::fs::metadata(base).is_ok() -} - -pub(crate) fn open(mut base: PathBuf) -> Result { - base.push("sled"); - base.push(SLED_034); - - let db = sled034::Config::default() - .cache_capacity(1024 * 1024 * 64) - .path(base) - .open()?; - - Ok(db) -} - -impl SledDb for sled034::Db { - type SledTree = sled034::Tree; - - fn open_tree(&self, name: &str) -> Result { - Ok(sled034::Db::open_tree(self, name)?) - } - - fn self_tree(&self) -> &Self::SledTree { - self - } -} - -impl SledTree for sled034::Tree { - fn get(&self, key: K) -> Result>, Error> - where - K: AsRef<[u8]>, - { - Ok(sled034::Tree::get(self, key)?.map(|v| Vec::from(v.as_ref()))) - } - - fn insert(&self, key: K, value: V) -> Result<(), Error> - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - Ok(sled034::Tree::insert(self, key, value.as_ref().to_vec()).map(|_| ())?) - } - - fn iter(&self) -> SledIter { - Box::new(sled034::Tree::iter(self).map(|res| { - res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec())) - .map_err(Error::from) - })) - } - - fn range(&self, range: R) -> SledIter - where - K: AsRef<[u8]>, - R: std::ops::RangeBounds, - { - Box::new(sled034::Tree::range(self, range).map(|res| { - res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec())) - .map_err(Error::from) - })) - } - - fn flush(&self) -> Result<(), Error> { - sled034::Tree::flush(self).map(|_| ()).map_err(Error::from) - } -} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 3d044e0..fdc03a5 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,16 +1,16 @@ use crate::{ details::MaybeHumanDate, repo::{ - Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, - FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, - QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, + Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, + HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, QueueRepo, + SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, store::StoreError, stream::from_iterator, }; use futures_util::{Future, Stream}; -use sled::{CompareAndSwapError, Db, IVec, Tree}; +use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; use std::{ collections::HashMap, path::PathBuf, @@ -936,12 +936,6 @@ impl MigrationRepo for SledRepo { type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>; -fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { - let mut v = hash.to_vec(); - v.append(&mut alias.to_bytes()); - v -} - #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { type Stream = LocalBoxStream<'static, StreamItem>; @@ -965,57 +959,50 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { - let res = b!(self.hashes, { - let hash2 = hash.clone(); - hashes.compare_and_swap(hash, None as Option, Some(hash2)) - }); + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError> { + let identifier: sled::IVec = identifier.to_bytes()?.into(); - Ok(res.map_err(|_| HashAlreadyExists)) + let hashes = self.hashes.clone(); + let hash_identifiers = self.hash_identifiers.clone(); + + let res = actix_web::web::block(move || { + (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { + if hashes.get(&hash)?.is_some() { + return Ok(Err(HashAlreadyExists)); + } + + hashes.insert(&hash, &hash)?; + hash_identifiers.insert(&hash, &identifier)?; + + Ok(Ok(())) + }) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(res) => Ok(res), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(StoreError::from(RepoError::from(SledError::from(e)))) + } + } } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { - let key = hash_alias_key(&hash, alias); - let value = alias.to_bytes(); - - b!(self.hash_aliases, hash_aliases.insert(key, value)); - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { - let key = hash_alias_key(&hash, alias); - - b!(self.hash_aliases, hash_aliases.remove(key)); - - Ok(()) - } - - #[tracing::instrument(skip_all)] - async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError> { - let v = b!(self.hash_aliases, { - Ok(hash_aliases - .scan_prefix(hash) - .values() - .filter_map(Result::ok) - .filter_map(|ivec| Alias::from_slice(&ivec)) - .collect::>()) as Result<_, sled::Error> - }); - - Ok(v) - } - - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] - async fn relate_identifier( + async fn update_identifier( &self, hash: Self::Bytes, identifier: &I, ) -> Result<(), StoreError> { - let bytes = identifier.to_bytes()?; + let identifier = identifier.to_bytes()?; - b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes)); + b!( + self.hash_identifiers, + hash_identifiers.insert(hash, identifier) + ); Ok(()) } @@ -1141,102 +1128,102 @@ impl HashRepo for SledRepo { #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { - let hash2 = hash.clone(); - b!(self.hashes, hashes.remove(hash2)); + let hashes = self.hashes.clone(); + let hash_identifiers = self.hash_identifiers.clone(); + let hash_motion_identifiers = self.hash_motion_identifiers.clone(); + let hash_variant_identifiers = self.hash_variant_identifiers.clone(); let hash2 = hash.clone(); - b!(self.hash_identifiers, hash_identifiers.remove(hash2)); - - let hash2 = hash.clone(); - b!( - self.hash_motion_identifiers, - hash_motion_identifiers.remove(hash2) - ); - - let aliases = self.aliases(hash.clone()).await?; - let hash2 = hash.clone(); - b!(self.hash_aliases, { - for alias in aliases { - let key = hash_alias_key(&hash2, &alias); - - let _ = hash_aliases.remove(key); - } - Ok(()) as Result<(), SledError> - }); - let variant_keys = b!(self.hash_variant_identifiers, { let v = hash_variant_identifiers - .scan_prefix(hash) + .scan_prefix(hash2) .keys() .filter_map(Result::ok) .collect::>(); Ok(v) as Result, SledError> }); - b!(self.hash_variant_identifiers, { - for key in variant_keys { - let _ = hash_variant_identifiers.remove(key); - } - Ok(()) as Result<(), SledError> - }); - Ok(()) + let res = actix_web::web::block(move || { + ( + &hashes, + &hash_identifiers, + &hash_motion_identifiers, + &hash_variant_identifiers, + ) + .transaction( + |( + hashes, + hash_identifiers, + hash_motion_identifiers, + hash_variant_identifiers, + )| { + hashes.remove(&hash)?; + hash_identifiers.remove(&hash)?; + hash_motion_identifiers.remove(&hash)?; + + for key in &variant_keys { + hash_variant_identifiers.remove(key)?; + } + + Ok(()) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) + } + } } } #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, alias: &Alias) -> Result, RepoError> { - let bytes = alias.to_bytes(); - let bytes2 = bytes.clone(); - - let res = b!( - self.aliases, - aliases.compare_and_swap(bytes, None as Option, Some(bytes2)) - ); - - Ok(res.map_err(|_| AliasAlreadyExists)) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn relate_delete_token( + async fn create( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, RepoError> { - let key = alias.to_bytes(); - let token = delete_token.to_bytes(); + hash: Self::Bytes, + ) -> Result, RepoError> { + let alias: sled::IVec = alias.to_bytes().into(); + let delete_token: sled::IVec = delete_token.to_bytes().into(); - let res = b!(self.alias_delete_tokens, { - let mut prev: Option = None; + let aliases = self.aliases.clone(); + let alias_hashes = self.alias_hashes.clone(); + let hash_aliases = self.hash_aliases.clone(); + let alias_delete_tokens = self.alias_delete_tokens.clone(); - loop { - let key = key.clone(); - let token = token.clone(); - - let res = alias_delete_tokens.compare_and_swap(key, prev, Some(token))?; - - match res { - Ok(()) => return Ok(Ok(())) as Result<_, SledError>, - Err(CompareAndSwapError { - current: Some(token), - .. - }) => { - if let Some(token) = DeleteToken::from_slice(&token) { - return Ok(Err(AlreadyExists(token))); - } else { - prev = Some(token); - }; + let res = actix_web::web::block(move || { + (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( + |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { + if aliases.get(&alias)?.is_some() { + return Ok(Err(AliasAlreadyExists)); } - Err(CompareAndSwapError { current: None, .. }) => { - prev = None; - } - } + + aliases.insert(&alias, &alias)?; + alias_hashes.insert(&alias, &hash)?; + hash_aliases.insert(&hash, &alias)?; + alias_delete_tokens.insert(&alias, &delete_token)?; + + Ok(Ok(())) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(res) => Ok(res), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) } - }); - - Ok(res) + } } #[tracing::instrument(level = "trace", skip(self))] @@ -1254,15 +1241,6 @@ impl AliasRepo for SledRepo { Ok(Some(token)) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> { - let key = alias.to_bytes(); - - b!(self.alias_hashes, alias_hashes.insert(key, hash)); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self))] async fn hash(&self, alias: &Alias) -> Result, RepoError> { let key = alias.to_bytes(); @@ -1272,19 +1250,50 @@ impl AliasRepo for SledRepo { Ok(opt) } + #[tracing::instrument(skip_all)] + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + let v = b!(self.hash_aliases, { + Ok(hash_aliases + .scan_prefix(hash) + .values() + .filter_map(Result::ok) + .filter_map(|ivec| Alias::from_slice(&ivec)) + .collect::>()) as Result<_, sled::Error> + }); + + Ok(v) + } + #[tracing::instrument(skip(self))] async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { - let key = alias.to_bytes(); + let alias: sled::IVec = alias.to_bytes().into(); - let key2 = key.clone(); - b!(self.aliases, aliases.remove(key2)); + let aliases = self.aliases.clone(); + let alias_hashes = self.alias_hashes.clone(); + let hash_aliases = self.hash_aliases.clone(); + let alias_delete_tokens = self.alias_delete_tokens.clone(); - let key2 = key.clone(); - b!(self.alias_delete_tokens, alias_delete_tokens.remove(key2)); + let res = actix_web::web::block(move || { + (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( + |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { + aliases.remove(&alias)?; + if let Some(hash) = alias_hashes.remove(&alias)? { + hash_aliases.remove(hash)?; + } + alias_delete_tokens.remove(&alias)?; + Ok(()) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; - b!(self.alias_hashes, alias_hashes.remove(key)); - - Ok(()) + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(e)) | Err(TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) + } + } } } diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs index 63d484b..08b223f 100644 --- a/src/store/file_store/file_id.rs +++ b/src/store/file_store/file_id.rs @@ -35,16 +35,6 @@ impl Identifier for FileId { } } -impl FileId { - pub(crate) fn normalize_for_migration(&self) -> Option { - if self.0.starts_with("files") { - Some(Self(self.0.components().skip(1).collect::())) - } else { - None - } - } -} - impl FileStore { pub(super) fn file_id_from_path(&self, path: PathBuf) -> Result { let stripped = path