2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-11-10 06:25:00 +00:00

Make migration concurrent

This commit is contained in:
asonix 2023-08-16 11:47:36 -05:00
parent 19893f0e41
commit 4d909ba32a
4 changed files with 54 additions and 21 deletions

View file

@ -1949,7 +1949,8 @@ impl PictRsConfiguration {
if arc_repo.get("migrate-0.4").await?.is_none() { if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? { if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone())
.await?;
arc_repo arc_repo
.set("migrate-0.4", Arc::from(b"migrated".to_vec())) .set("migrate-0.4", Arc::from(b"migrated".to_vec()))
.await?; .await?;
@ -2005,7 +2006,8 @@ impl PictRsConfiguration {
if arc_repo.get("migrate-0.4").await?.is_none() { if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? { if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone())
.await?;
arc_repo arc_repo
.set("migrate-0.4", Arc::from(b"migrated".to_vec())) .set("migrate-0.4", Arc::from(b"migrated".to_vec()))
.await?; .await?;

View file

@ -74,7 +74,7 @@ pub(crate) trait FullRepo:
+ AliasRepo + AliasRepo
+ QueueRepo + QueueRepo
+ HashRepo + HashRepo
+ MigrationRepo + StoreMigrationRepo
+ AliasAccessRepo + AliasAccessRepo
+ VariantAccessRepo + VariantAccessRepo
+ ProxyRepo + ProxyRepo
@ -409,7 +409,7 @@ where
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait MigrationRepo: BaseRepo { pub(crate) trait StoreMigrationRepo: BaseRepo {
async fn is_continuing_migration(&self) -> Result<bool, RepoError>; async fn is_continuing_migration(&self) -> Result<bool, RepoError>;
async fn mark_migrated( async fn mark_migrated(
@ -424,9 +424,9 @@ pub(crate) trait MigrationRepo: BaseRepo {
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl<T> MigrationRepo for Arc<T> impl<T> StoreMigrationRepo for Arc<T>
where where
T: MigrationRepo, T: StoreMigrationRepo,
{ {
async fn is_continuing_migration(&self) -> Result<bool, RepoError> { async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
T::is_continuing_migration(self).await T::is_continuing_migration(self).await

View file

@ -1,3 +1,5 @@
use tokio::task::JoinSet;
use crate::{ use crate::{
config::Configuration, config::Configuration,
details::Details, details::Details,
@ -11,12 +13,15 @@ use crate::{
stream::IntoStreamer, stream::IntoStreamer,
}; };
const MIGRATE_CONCURRENCY: usize = 32;
const GENERATOR_KEY: &str = "last-path";
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn migrate_04<S: Store>( pub(crate) async fn migrate_04<S: Store + 'static>(
old_repo: OldSledRepo, old_repo: OldSledRepo,
new_repo: &ArcRepo, new_repo: ArcRepo,
store: &S, store: S,
config: &Configuration, config: Configuration,
) -> Result<(), Error> { ) -> Result<(), Error> {
tracing::warn!("Running checks"); tracing::warn!("Running checks");
if let Err(e) = old_repo.health_check().await { if let Err(e) = old_repo.health_check().await {
@ -39,15 +44,38 @@ pub(crate) async fn migrate_04<S: Store>(
let mut hash_stream = old_repo.hashes().await.into_streamer(); let mut hash_stream = old_repo.hashes().await.into_streamer();
let mut set = JoinSet::new();
let mut index = 0; let mut index = 0;
while let Some(res) = hash_stream.next().await { while let Some(res) = hash_stream.next().await {
index += 1;
if let Ok(hash) = res { if let Ok(hash) = res {
let _ = migrate_hash_04(&old_repo, new_repo, store, config, hash).await; set.spawn_local(migrate_hash_04(
old_repo.clone(),
new_repo.clone(),
store.clone(),
config.clone(),
hash.clone(),
));
} else { } else {
tracing::warn!("Failed to read hash, skipping"); tracing::warn!("Failed to read hash, skipping");
} }
while set.len() >= MIGRATE_CONCURRENCY {
if let Some(_) = set.join_next().await {
index += 1;
if index % pct == 0 {
let percent = index / pct;
tracing::warn!("Migration {percent}% complete - {index}/{total_size}");
}
}
}
}
while let Some(_) = set.join_next().await {
index += 1;
if index % pct == 0 { if index % pct == 0 {
let percent = index / pct; let percent = index / pct;
@ -55,25 +83,28 @@ pub(crate) async fn migrate_04<S: Store>(
} }
} }
if let Some(generator_state) = old_repo.get("last-path").await? { if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? {
new_repo new_repo
.set("last-path", generator_state.to_vec().into()) .set(GENERATOR_KEY, generator_state.to_vec().into())
.await?; .await?;
} }
tracing::warn!("Migration complete");
Ok(()) Ok(())
} }
async fn migrate_hash_04<S: Store>( async fn migrate_hash_04<S: Store>(
old_repo: &OldSledRepo, old_repo: OldSledRepo,
new_repo: &ArcRepo, new_repo: ArcRepo,
store: &S, store: S,
config: &Configuration, config: Configuration,
old_hash: sled::IVec, old_hash: sled::IVec,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut hash_failures = 0; let mut hash_failures = 0;
while let Err(e) = do_migrate_hash_04(old_repo, new_repo, store, config, old_hash.clone()).await while let Err(e) =
do_migrate_hash_04(&old_repo, &new_repo, &store, &config, old_hash.clone()).await
{ {
hash_failures += 1; hash_failures += 1;

View file

@ -3,7 +3,7 @@ use crate::{
repo::{ repo::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId,
MigrationRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, UploadId, UploadRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, UploadResult, VariantAccessRepo,
}, },
serde_str::Serde, serde_str::Serde,
@ -928,7 +928,7 @@ impl IdentifierRepo for SledRepo {
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl MigrationRepo for SledRepo { impl StoreMigrationRepo for SledRepo {
async fn is_continuing_migration(&self) -> Result<bool, RepoError> { async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
Ok(!self.migration_identifiers.is_empty()) Ok(!self.migration_identifiers.is_empty())
} }