2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2025-01-25 18:55:58 +00:00

Enable specifying migrate-store concurrency

This commit is contained in:
asonix 2023-10-03 15:50:53 -05:00
parent 6544c594ce
commit 7cafe5ce25
4 changed files with 40 additions and 7 deletions

View file

@ -613,10 +613,17 @@ There's a few required configuration options for object storage. I will try to e
- secret-key: this is a second secret your cloud provider will give to you in order to access the - secret-key: this is a second secret your cloud provider will give to you in order to access the
bucket bucket
Additionally, there's a commandline argument that can be set to change the default level of
concurrency for the migration. pict-rs will attempt to migrate 32 hashes at a time, but for large
deployments, it may be worth trying to increase this value. Setting it to 128, 256, or even 512
could be useful. Note that the bigger this value, the more concurrent connections to the object
storage provider will be made.
The command will look something like this: The command will look something like this:
```bash ```bash
$ pict-rs \ $ pict-rs \
migrate-store \ migrate-store \
--concurrency 32 \
filesystem \ filesystem \
-p /path/to/files \ -p /path/to/files \
object-storage \ object-storage \

View file

@ -156,6 +156,7 @@ impl Args {
} }
Command::MigrateStore(MigrateStore { Command::MigrateStore(MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
store, store,
}) => { }) => {
let server = Server::default(); let server = Server::default();
@ -174,6 +175,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -192,6 +194,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -214,6 +217,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -235,6 +239,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -262,6 +267,7 @@ pub(crate) enum Operation {
Run, Run,
MigrateStore { MigrateStore {
skip_missing_files: bool, skip_missing_files: bool,
concurrency: usize,
from: crate::config::primitives::Store, from: crate::config::primitives::Store,
to: crate::config::primitives::Store, to: crate::config::primitives::Store,
}, },
@ -571,6 +577,12 @@ struct MigrateStore {
#[arg(long)] #[arg(long)]
skip_missing_files: bool, skip_missing_files: bool,
/// How many hashes pict-rs should attempt to migrate at the same time. This does not
/// correspond to a thread count, but instead how many in-flight migrations can happen.
/// Increasing this number may improve throughput
#[arg(long, default_value = "32")]
concurrency: usize,
#[command(subcommand)] #[command(subcommand)]
store: MigrateStoreFrom, store: MigrateStoreFrom,
} }

View file

@ -1327,6 +1327,7 @@ async fn migrate_inner<S1>(
from: S1, from: S1,
to: config::primitives::Store, to: config::primitives::Store,
skip_missing_files: bool, skip_missing_files: bool,
concurrency: usize,
) -> color_eyre::Result<()> ) -> color_eyre::Result<()>
where where
S1: Store + 'static, S1: Store + 'static,
@ -1336,7 +1337,9 @@ where
let to = FileStore::build(path.clone(), repo.clone()).await?; let to = FileStore::build(path.clone(), repo.clone()).await?;
match repo { match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, concurrency).await?
}
} }
} }
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
@ -1370,7 +1373,9 @@ where
.build(client); .build(client);
match repo { match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, concurrency).await?
}
} }
} }
} }
@ -1457,13 +1462,14 @@ pub async fn run() -> color_eyre::Result<()> {
Operation::Run => (), Operation::Run => (),
Operation::MigrateStore { Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from, from,
to, to,
} => { } => {
match from { match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?; let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(repo, client, from, to, skip_missing_files).await?; migrate_inner(repo, client, from, to, skip_missing_files, concurrency).await?;
} }
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
@ -1495,7 +1501,7 @@ pub async fn run() -> color_eyre::Result<()> {
.await? .await?
.build(client.clone()); .build(client.clone());
migrate_inner(repo, client, from, to, skip_missing_files).await?; migrate_inner(repo, client, from, to, skip_missing_files, concurrency).await?;
} }
} }

View file

@ -16,6 +16,7 @@ pub(super) async fn migrate_store<R, S1, S2>(
from: S1, from: S1,
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
concurrency: usize,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store + Clone + 'static, S1: Store + Clone + 'static,
@ -37,8 +38,14 @@ where
let mut failure_count = 0; let mut failure_count = 0;
while let Err(e) = while let Err(e) = do_migrate_store(
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await repo.clone(),
from.clone(),
to.clone(),
skip_missing_files,
concurrency,
)
.await
{ {
tracing::error!("Migration failed with {}", format!("{e:?}")); tracing::error!("Migration failed with {}", format!("{e:?}"));
@ -75,6 +82,7 @@ async fn do_migrate_store<R, S1, S2>(
from: S1, from: S1,
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
concurrency: usize,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store + 'static, S1: Store + 'static,
@ -116,7 +124,7 @@ where
while let Some(hash) = stream.next().await { while let Some(hash) = stream.next().await {
let hash = hash?.as_ref().to_vec(); let hash = hash?.as_ref().to_vec();
if joinset.len() >= 32 { if joinset.len() >= concurrency {
if let Some(res) = joinset.join_next().await { if let Some(res) = joinset.join_next().await {
res.map_err(|_| UploadError::Canceled)??; res.map_err(|_| UploadError::Canceled)??;
} }