From 25c717771c9a6cff2a756ed2a9c4f9b9776a6509 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 15 Jul 2023 14:37:17 -0500 Subject: [PATCH] Attempt to implement concurrent hash migrations --- src/lib.rs | 388 +++++++++++++++++++++++++++-------------------- src/repo.rs | 42 +++++ src/repo/sled.rs | 41 ++++- 3 files changed, 302 insertions(+), 169 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 992718c..83c38ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ use std::{ future::ready, path::Path, path::PathBuf, + rc::Rc, sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant, SystemTime}, }; @@ -62,7 +63,7 @@ use self::{ middleware::{Deadline, Internal}, queue::queue_generate, repo::{ - Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo, + Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo, Repo, UploadId, UploadResult, }, serde_str::Serde, @@ -1263,14 +1264,14 @@ async fn launch_object_store< } async fn migrate_inner( - repo: &Repo, + repo: Repo, client: Client, from: S1, to: config::primitives::Store, skip_missing_files: bool, ) -> color_eyre::Result<()> where - S1: Store, + S1: Store + 'static, { match to { config::primitives::Store::Filesystem(config::Filesystem { path }) => { @@ -1405,7 +1406,7 @@ pub async fn run() -> color_eyre::Result<()> { match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(&repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1437,7 +1438,7 @@ pub async fn run() -> color_eyre::Result<()> { .await? .build(client.clone()); - migrate_inner(&repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; } } @@ -1506,20 +1507,16 @@ pub async fn run() -> color_eyre::Result<()> { Ok(()) } -const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; -const STORE_MIGRATION_MOTION: &str = "store-migration-motion"; -const STORE_MIGRATION_VARIANT: &str = "store-migration-variant"; - async fn migrate_store( - repo: &R, + repo: R, from: S1, to: S2, skip_missing_files: bool, ) -> Result<(), Error> where - S1: Store + Clone, - S2: Store + Clone, - R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, + S1: Store + Clone + 'static, + S2: Store + Clone + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, { tracing::warn!("Running checks"); @@ -1536,7 +1533,9 @@ where let mut failure_count = 0; - while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await { + while let Err(e) = + do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await + { tracing::error!("Migration failed with {}", format!("{e:?}")); failure_count += 1; @@ -1553,139 +1552,164 @@ where Ok(()) } + +struct MigrateState { + repo: R, + from: S1, + to: S2, + continuing_migration: bool, + skip_missing_files: bool, + initial_repo_size: u64, + repo_size: AtomicU64, + pct: AtomicU64, + index: AtomicU64, + started_at: Instant, +} + async fn do_migrate_store( - repo: &R, + repo: R, from: S1, to: S2, skip_missing_files: bool, ) -> Result<(), Error> where - S1: Store, - S2: Store, - R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, + S1: Store + 'static, + S2: Store + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, { - let mut repo_size = repo.size().await?; + let continuing_migration = repo.is_continuing_migration().await?; + let initial_repo_size = repo.size().await?; - let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; - - if progress_opt.is_some() { - tracing::warn!("Continuing previous migration of {repo_size} total hashes"); + if continuing_migration { + tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes"); } else { - tracing::warn!("{repo_size} hashes will be migrated"); + tracing::warn!("{initial_repo_size} hashes will be migrated"); } - if repo_size == 0 { + if initial_repo_size == 0 { return Ok(()); } - let mut pct = repo_size / 100; - // Hashes are read in a consistent order let stream = repo.hashes().await; let mut stream = Box::pin(stream); - let now = Instant::now(); - let mut index = 0; + let state = Rc::new(MigrateState { + repo: repo.clone(), + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size: AtomicU64::new(initial_repo_size), + pct: AtomicU64::new(initial_repo_size / 100), + index: AtomicU64::new(0), + started_at: Instant::now(), + }); + + let mut joinset = tokio::task::JoinSet::new(); + while let Some(hash) = stream.next().await { - index += 1; + let hash = hash?.as_ref().to_vec(); - let hash = hash?; - - if let Some(progress) = &progress_opt { - // we've reached the most recently migrated hash. - if progress.as_ref() == hash.as_ref() { - progress_opt.take(); - - // update repo size to remaining size - repo_size = repo_size.saturating_sub(index); - // update pct to be proportional to remainging size - pct = repo_size / 100; - - // reset index to 0 for proper percent scaling - index = 0; - - tracing::warn!( - "Caught up to previous migration's end. {repo_size} hashes will be migrated" - ); + if joinset.len() >= 32 { + if let Some(res) = joinset.join_next().await { + res.map_err(|_| UploadError::Canceled)??; } - continue; } - let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await { - Ok(Some(identifier)) => identifier, - Ok(None) => { - tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) - ); - crate::queue::cleanup_hash(repo, hash).await?; - continue; - } - Err(e) => return Err(e.into()), - }; + let state = Rc::clone(&state); + joinset.spawn_local(async move { migrate_hash(&state, hash).await }); + } - if let Some(identifier) = repo - .motion_identifier(hash.as_ref().to_vec().into()) - .await? + // clean up the migration table to avoid interfering with future migrations + repo.clear().await?; + + tracing::warn!("Migration completed successfully"); + + Ok(()) +} + +#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] +async fn migrate_hash( + state: &MigrateState, + hash: Vec, +) -> Result<(), Error> +where + S1: Store, + S2: Store, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo, +{ + let MigrateState { + repo, + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size, + pct, + index, + started_at, + } = state; + + let current_index = index.fetch_add(1, Ordering::Relaxed); + + let original_identifier = match repo.identifier(hash.clone().into()).await { + Ok(Some(identifier)) => identifier, + Ok(None) => { + tracing::warn!( + "Original File identifier for hash {} is missing, queue cleanup task", + hex::encode(&hash) + ); + crate::queue::cleanup_hash(repo, hash.clone().into()).await?; + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + if repo.is_migrated(&original_identifier).await? { + // migrated original for hash - this means we can skip + return Ok(()); + } + + let current_repo_size = repo_size.load(Ordering::Acquire); + + if *continuing_migration && current_repo_size == *initial_repo_size { + // first time reaching unmigrated hash + + let new_repo_size = initial_repo_size.saturating_sub(current_index); + + if repo_size + .compare_exchange( + current_repo_size, + new_repo_size, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() { - if repo.get(STORE_MIGRATION_MOTION).await?.is_none() { - match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_motion_identifier( - hash.as_ref().to_vec().into(), - &new_identifier, - ) - .await?; - repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into()) - .await?; - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error fetching details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e.into()); - } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating motion file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating motion file to new store"); - return Err(e.into()); - } - } - } + // we successfully updated the count, we're now in charge of setting up pct and + // index and printing migration message + + pct.store(new_repo_size / 100, Ordering::Release); + index.store(0, Ordering::Release); + + tracing::warn!( + "Caught up to previous migration's end. {new_repo_size} hashes will be migrated" + ); } + } - let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?; - - for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { - if let Some(variant_progress) = &variant_progress_opt { - if variant.as_bytes() == variant_progress.as_ref() { - variant_progress_opt.take(); - } - continue; - } - - match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { + if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { Ok(new_identifier) => { - migrate_details(repo, identifier, &new_identifier).await?; - repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) + migrate_details(repo, &identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.clone().into(), &new_identifier) .await?; - repo.relate_variant_identifier( - hash.as_ref().to_vec().into(), - variant, - &new_identifier, - ) - .await?; - repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) - .await?; + repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::Details(e)) => { tracing::warn!( @@ -1694,7 +1718,41 @@ where ); return Err(e.into()); } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating motion file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating motion file to new store"); + return Err(e.into()); + } + } + } + } + + for (variant, identifier) in repo.variants(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &identifier, &new_identifier).await?; + repo.remove_variant(hash.clone().into(), variant.clone()) + .await?; + repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) + .await?; + + repo.mark_migrated(&identifier, &new_identifier).await?; + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error fetching details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { tracing::warn!( "Skipping variant {} for hash {}", variant, @@ -1711,58 +1769,54 @@ where } } } + } - match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, original_identifier, &new_identifier).await?; - repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error fetching details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e.into()); - } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating original file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating original file to new store"); - return Err(e.into()); - } + match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &original_identifier, &new_identifier).await?; + repo.relate_identifier(hash.clone().into(), &new_identifier) + .await?; } - - repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) - .await?; - repo.remove(STORE_MIGRATION_VARIANT).await?; - repo.remove(STORE_MIGRATION_MOTION).await?; - - if pct > 0 && index % pct == 0 { - let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range"); - if percent == 0 { - continue; - } - - let elapsed = now.elapsed(); - let estimated_duration_percent = elapsed / percent; - let estimated_duration_remaining = - (100u32.saturating_sub(percent)) * estimated_duration_percent; - - tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)"); - tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error fetching details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating original file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating original file to new store"); + return Err(e.into()); } } - // clean up the migration key to avoid interfering with future migrations - repo.remove(STORE_MIGRATION_PROGRESS).await?; + let current_pct = pct.load(Ordering::Relaxed); + if current_pct > 0 && current_index % current_pct == 0 { + let percent = u32::try_from(current_index / current_pct) + .expect("values 0-100 are always in u32 range"); + if percent == 0 { + return Ok(()); + } - tracing::warn!("Migration completed successfully"); + let elapsed = started_at.elapsed(); + let estimated_duration_percent = elapsed / percent; + let estimated_duration_remaining = + (100u32.saturating_sub(percent)) * estimated_duration_percent; + + let current_repo_size = repo_size.load(Ordering::Relaxed); + + tracing::warn!( + "Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)" + ); + tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); + } Ok(()) } @@ -1845,15 +1899,15 @@ where Ok(new_identifier) } -async fn migrate_details(repo: &R, from: I1, to: &I2) -> Result<(), Error> +async fn migrate_details(repo: &R, from: &I1, to: &I2) -> Result<(), Error> where R: IdentifierRepo, I1: Identifier, I2: Identifier, { - if let Some(details) = repo.details(&from).await? { + if let Some(details) = repo.details(from).await? { repo.relate_details(to, &details).await?; - repo.cleanup(&from).await?; + repo.cleanup(from).await?; } Ok(()) diff --git a/src/repo.rs b/src/repo.rs index 4d45f5d..430adfd 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -72,6 +72,7 @@ pub(crate) trait FullRepo: + AliasRepo + QueueRepo + HashRepo + + MigrationRepo + Send + Sync + Clone @@ -261,6 +262,47 @@ where } } +#[async_trait::async_trait(?Send)] +pub(crate) trait MigrationRepo: BaseRepo { + async fn is_continuing_migration(&self) -> Result; + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError>; + + async fn is_migrated(&self, identifier: &I) -> Result; + + async fn clear(&self) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl MigrationRepo for actix_web::web::Data +where + T: MigrationRepo, +{ + async fn is_continuing_migration(&self) -> Result { + T::is_continuing_migration(self).await + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + T::mark_migrated(self, old_identifier, new_identifier).await + } + + async fn is_migrated(&self, identifier: &I) -> Result { + T::is_migrated(self, identifier).await + } + + async fn clear(&self) -> Result<(), RepoError> { + T::clear(self).await + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { type Stream: Stream>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 4e9f7a3..e752582 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,8 +2,8 @@ use crate::{ details::MaybeHumanDate, repo::{ Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, - FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, - UploadId, UploadRepo, UploadResult, + FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, + QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, store::StoreError, @@ -69,6 +69,7 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, + migration_identifiers: Tree, cache_capacity: u64, export_path: PathBuf, db: Db, @@ -100,6 +101,7 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, + migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, cache_capacity, export_path, db, @@ -461,6 +463,41 @@ impl IdentifierRepo for SledRepo { } } +#[async_trait::async_trait(?Send)] +impl MigrationRepo for SledRepo { + async fn is_continuing_migration(&self) -> Result { + Ok(!self.migration_identifiers.is_empty()) + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + let key = new_identifier.to_bytes()?; + let value = old_identifier.to_bytes()?; + + b!( + self.migration_identifiers, + migration_identifiers.insert(key, value) + ); + + Ok(()) + } + + async fn is_migrated(&self, identifier: &I) -> Result { + let key = identifier.to_bytes()?; + + Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) + } + + async fn clear(&self) -> Result<(), RepoError> { + b!(self.migration_identifiers, migration_identifiers.clear()); + + Ok(()) + } +} + type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>;