From 4d909ba32a94952d605caa1a7d175767a6ad2d1d Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 16 Aug 2023 11:47:36 -0500 Subject: [PATCH] Make migration concurrent --- src/lib.rs | 6 +++-- src/repo.rs | 8 +++---- src/repo/migrate.rs | 57 ++++++++++++++++++++++++++++++++++----------- src/repo/sled.rs | 4 ++-- 4 files changed, 54 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f0b1dca..e464ac9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1949,7 +1949,8 @@ impl PictRsConfiguration { if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(old_repo) = repo_04::open(&config.old_repo)? { - repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; + repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) + .await?; arc_repo .set("migrate-0.4", Arc::from(b"migrated".to_vec())) .await?; @@ -2005,7 +2006,8 @@ impl PictRsConfiguration { if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(old_repo) = repo_04::open(&config.old_repo)? { - repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; + repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) + .await?; arc_repo .set("migrate-0.4", Arc::from(b"migrated".to_vec())) .await?; diff --git a/src/repo.rs b/src/repo.rs index 6dd99bc..fd1ee07 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -74,7 +74,7 @@ pub(crate) trait FullRepo: + AliasRepo + QueueRepo + HashRepo - + MigrationRepo + + StoreMigrationRepo + AliasAccessRepo + VariantAccessRepo + ProxyRepo @@ -409,7 +409,7 @@ where } #[async_trait::async_trait(?Send)] -pub(crate) trait MigrationRepo: BaseRepo { +pub(crate) trait StoreMigrationRepo: BaseRepo { async fn is_continuing_migration(&self) -> Result; async fn mark_migrated( @@ -424,9 +424,9 @@ pub(crate) trait MigrationRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl MigrationRepo for Arc +impl StoreMigrationRepo for Arc where - T: MigrationRepo, + T: StoreMigrationRepo, { async fn is_continuing_migration(&self) -> Result { T::is_continuing_migration(self).await diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 910d484..b85080d 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -1,3 +1,5 @@ +use tokio::task::JoinSet; + use crate::{ config::Configuration, details::Details, @@ -11,12 +13,15 @@ use crate::{ stream::IntoStreamer, }; +const MIGRATE_CONCURRENCY: usize = 32; +const GENERATOR_KEY: &str = "last-path"; + #[tracing::instrument(skip_all)] -pub(crate) async fn migrate_04( +pub(crate) async fn migrate_04( old_repo: OldSledRepo, - new_repo: &ArcRepo, - store: &S, - config: &Configuration, + new_repo: ArcRepo, + store: S, + config: Configuration, ) -> Result<(), Error> { tracing::warn!("Running checks"); if let Err(e) = old_repo.health_check().await { @@ -39,15 +44,38 @@ pub(crate) async fn migrate_04( let mut hash_stream = old_repo.hashes().await.into_streamer(); + let mut set = JoinSet::new(); + let mut index = 0; while let Some(res) = hash_stream.next().await { - index += 1; if let Ok(hash) = res { - let _ = migrate_hash_04(&old_repo, new_repo, store, config, hash).await; + set.spawn_local(migrate_hash_04( + old_repo.clone(), + new_repo.clone(), + store.clone(), + config.clone(), + hash.clone(), + )); } else { tracing::warn!("Failed to read hash, skipping"); } + while set.len() >= MIGRATE_CONCURRENCY { + if let Some(_) = set.join_next().await { + index += 1; + + if index % pct == 0 { + let percent = index / pct; + + tracing::warn!("Migration {percent}% complete - {index}/{total_size}"); + } + } + } + } + + while let Some(_) = set.join_next().await { + index += 1; + if index % pct == 0 { let percent = index / pct; @@ -55,25 +83,28 @@ pub(crate) async fn migrate_04( } } - if let Some(generator_state) = old_repo.get("last-path").await? { + if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? { new_repo - .set("last-path", generator_state.to_vec().into()) + .set(GENERATOR_KEY, generator_state.to_vec().into()) .await?; } + tracing::warn!("Migration complete"); + Ok(()) } async fn migrate_hash_04( - old_repo: &OldSledRepo, - new_repo: &ArcRepo, - store: &S, - config: &Configuration, + old_repo: OldSledRepo, + new_repo: ArcRepo, + store: S, + config: Configuration, old_hash: sled::IVec, ) -> Result<(), Error> { let mut hash_failures = 0; - while let Err(e) = do_migrate_hash_04(old_repo, new_repo, store, config, old_hash.clone()).await + while let Err(e) = + do_migrate_hash_04(&old_repo, &new_repo, &store, &config, old_hash.clone()).await { hash_failures += 1; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index d35149e..2ba7d33 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -3,7 +3,7 @@ use crate::{ repo::{ hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, - MigrationRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, UploadId, UploadRepo, + ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, VariantAccessRepo, }, serde_str::Serde, @@ -928,7 +928,7 @@ impl IdentifierRepo for SledRepo { } #[async_trait::async_trait(?Send)] -impl MigrationRepo for SledRepo { +impl StoreMigrationRepo for SledRepo { async fn is_continuing_migration(&self) -> Result { Ok(!self.migration_identifiers.is_empty()) }