2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2025-01-08 18:51:24 +00:00

Enable specifying migrate-store concurrency

This commit is contained in:
asonix 2023-10-03 15:50:53 -05:00
parent 853448d9c2
commit 247902e600
4 changed files with 29 additions and 3 deletions

View file

@ -766,10 +766,17 @@ There's a few required configuration options for object storage. I will try to e
- secret-key: this is a second secret your cloud provider will give to you in order to access the - secret-key: this is a second secret your cloud provider will give to you in order to access the
bucket bucket
Additionally, there's a commandline argument that can be set to change the default level of
concurrency for the migration. pict-rs will attempt to migrate 32 hashes at a time, but for large
deployments, it may be worth trying to increase this value. Setting it to 128, 256, or even 512
could be useful. Note that the bigger this value, the more concurrent connections to the object
storage provider will be made.
The command will look something like this: The command will look something like this:
```bash ```bash
$ pict-rs \ $ pict-rs \
migrate-store \ migrate-store \
--concurrency 32 \
filesystem \ filesystem \
-p /path/to/files \ -p /path/to/files \
object-storage \ object-storage \

View file

@ -249,6 +249,7 @@ impl Args {
} }
Command::MigrateStore(MigrateStore { Command::MigrateStore(MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
store, store,
}) => { }) => {
let server = Server::default(); let server = Server::default();
@ -271,6 +272,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -291,6 +293,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -315,6 +318,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -338,6 +342,7 @@ impl Args {
}, },
operation: Operation::MigrateStore { operation: Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
@ -450,6 +455,7 @@ pub(crate) enum Operation {
Run, Run,
MigrateStore { MigrateStore {
skip_missing_files: bool, skip_missing_files: bool,
concurrency: usize,
from: crate::config::primitives::Store, from: crate::config::primitives::Store,
to: crate::config::primitives::Store, to: crate::config::primitives::Store,
}, },
@ -1092,6 +1098,12 @@ struct MigrateStore {
#[arg(long)] #[arg(long)]
skip_missing_files: bool, skip_missing_files: bool,
/// How many hashes pict-rs should attempt to migrate at the same time. This does not
/// correspond to a thread count, but instead how many in-flight migrations can happen.
/// Increasing this number may improve throughput
#[arg(long, default_value = "32")]
concurrency: usize,
#[command(subcommand)] #[command(subcommand)]
store: MigrateStoreFrom, store: MigrateStoreFrom,
} }

View file

@ -1840,6 +1840,7 @@ async fn migrate_inner<S1>(
to: config::primitives::Store, to: config::primitives::Store,
skip_missing_files: bool, skip_missing_files: bool,
timeout: u64, timeout: u64,
concurrency: usize,
) -> color_eyre::Result<()> ) -> color_eyre::Result<()>
where where
S1: Store + 'static, S1: Store + 'static,
@ -1848,7 +1849,7 @@ where
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?; let to = FileStore::build(path.clone(), repo.clone()).await?;
migrate_store(repo, from, to, skip_missing_files, timeout).await? migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await?
} }
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
@ -1882,7 +1883,7 @@ where
.await? .await?
.build(client); .build(client);
migrate_store(repo, from, to, skip_missing_files, timeout).await? migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await?
} }
} }
@ -1984,6 +1985,7 @@ impl PictRsConfiguration {
Operation::Run => (), Operation::Run => (),
Operation::MigrateStore { Operation::MigrateStore {
skip_missing_files, skip_missing_files,
concurrency,
from, from,
to, to,
} => { } => {
@ -1999,6 +2001,7 @@ impl PictRsConfiguration {
to, to,
skip_missing_files, skip_missing_files,
config.media.process_timeout, config.media.process_timeout,
concurrency,
) )
.await?; .await?;
} }
@ -2043,6 +2046,7 @@ impl PictRsConfiguration {
to, to,
skip_missing_files, skip_missing_files,
config.media.process_timeout, config.media.process_timeout,
concurrency,
) )
.await?; .await?;
} }

View file

@ -22,6 +22,7 @@ pub(super) async fn migrate_store<S1, S2>(
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
timeout: u64, timeout: u64,
concurrency: usize,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store + Clone + 'static, S1: Store + Clone + 'static,
@ -48,6 +49,7 @@ where
to.clone(), to.clone(),
skip_missing_files, skip_missing_files,
timeout, timeout,
concurrency,
) )
.await .await
{ {
@ -88,6 +90,7 @@ async fn do_migrate_store<S1, S2>(
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
timeout: u64, timeout: u64,
concurrency: usize,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store + 'static, S1: Store + 'static,
@ -129,7 +132,7 @@ where
while let Some(hash) = stream.next().await { while let Some(hash) = stream.next().await {
let hash = hash?; let hash = hash?;
if joinset.len() >= 32 { if joinset.len() >= concurrency {
if let Some(res) = joinset.join_next().await { if let Some(res) = joinset.join_next().await {
res.map_err(|_| UploadError::Canceled)??; res.map_err(|_| UploadError::Canceled)??;
} }