diff --git a/README.md b/README.md index cb715fd..e032c3d 100644 --- a/README.md +++ b/README.md @@ -613,10 +613,17 @@ There's a few required configuration options for object storage. I will try to e - secret-key: this is a second secret your cloud provider will give to you in order to access the bucket +Additionally, there's a commandline argument that can be set to change the default level of +concurrency for the migration. pict-rs will attempt to migrate 32 hashes at a time, but for large +deployments, it may be worth trying to increase this value. Setting it to 128, 256, or even 512 +could be useful. Note that the bigger this value, the more concurrent connections to the object +storage provider will be made. + The command will look something like this: ```bash $ pict-rs \ migrate-store \ + --concurrency 32 \ filesystem \ -p /path/to/files \ object-storage \ diff --git a/src/config/commandline.rs b/src/config/commandline.rs index bc48b4d..2362e9a 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -156,6 +156,7 @@ impl Args { } Command::MigrateStore(MigrateStore { skip_missing_files, + concurrency, store, }) => { let server = Server::default(); @@ -174,6 +175,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -192,6 +194,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -214,6 +217,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -235,6 +239,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -262,6 +267,7 @@ pub(crate) enum Operation { Run, MigrateStore { skip_missing_files: bool, + concurrency: usize, from: crate::config::primitives::Store, to: crate::config::primitives::Store, }, @@ -571,6 +577,12 @@ struct MigrateStore { #[arg(long)] skip_missing_files: bool, + /// How many hashes pict-rs should attempt to migrate at the same time. This does not + /// correspond to a thread count, but instead how many in-flight migrations can happen. + /// Increasing this number may improve throughput + #[arg(long, default_value = "32")] + concurrency: usize, + #[command(subcommand)] store: MigrateStoreFrom, } diff --git a/src/lib.rs b/src/lib.rs index 2d6d57a..f0cfbb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1327,6 +1327,7 @@ async fn migrate_inner( from: S1, to: config::primitives::Store, skip_missing_files: bool, + concurrency: usize, ) -> color_eyre::Result<()> where S1: Store + 'static, @@ -1336,7 +1337,9 @@ where let to = FileStore::build(path.clone(), repo.clone()).await?; match repo { - Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, + Repo::Sled(repo) => { + migrate_store(repo, from, to, skip_missing_files, concurrency).await? + } } } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { @@ -1370,7 +1373,9 @@ where .build(client); match repo { - Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, + Repo::Sled(repo) => { + migrate_store(repo, from, to, skip_missing_files, concurrency).await? + } } } } @@ -1457,13 +1462,14 @@ pub async fn run() -> color_eyre::Result<()> { Operation::Run => (), Operation::MigrateStore { skip_missing_files, + concurrency, from, to, } => { match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files, concurrency).await?; } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1495,7 +1501,7 @@ pub async fn run() -> color_eyre::Result<()> { .await? .build(client.clone()); - migrate_inner(repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files, concurrency).await?; } } diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 247acc4..f5df753 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -16,6 +16,7 @@ pub(super) async fn migrate_store( from: S1, to: S2, skip_missing_files: bool, + concurrency: usize, ) -> Result<(), Error> where S1: Store + Clone + 'static, @@ -37,8 +38,14 @@ where let mut failure_count = 0; - while let Err(e) = - do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await + while let Err(e) = do_migrate_store( + repo.clone(), + from.clone(), + to.clone(), + skip_missing_files, + concurrency, + ) + .await { tracing::error!("Migration failed with {}", format!("{e:?}")); @@ -75,6 +82,7 @@ async fn do_migrate_store( from: S1, to: S2, skip_missing_files: bool, + concurrency: usize, ) -> Result<(), Error> where S1: Store + 'static, @@ -116,7 +124,7 @@ where while let Some(hash) = stream.next().await { let hash = hash?.as_ref().to_vec(); - if joinset.len() >= 32 { + if joinset.len() >= concurrency { if let Some(res) = joinset.join_next().await { res.map_err(|_| UploadError::Canceled)??; }