From f223d477c7998ed0654e2fcf1dd6cddcdd948d89 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 3 Oct 2023 16:27:19 -0500 Subject: [PATCH] Enable specifying concurrency for 0.4 to 0.5 migration --- src/config/commandline.rs | 32 ++++++++++++++++++++++++++++++++ src/config/defaults.rs | 13 +++++++++++++ src/config/file.rs | 7 +++++++ src/repo/migrate.rs | 3 +-- 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/config/commandline.rs b/src/config/commandline.rs index f850a71..fef72a8 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -53,6 +53,7 @@ impl Args { address, api_key, client_timeout, + upgrade_concurrency, metrics_prometheus_address, media_preprocess_steps, media_external_validation, @@ -113,6 +114,10 @@ impl Args { timeout: client_timeout, }; + let upgrade = Upgrade { + concurrency: upgrade_concurrency, + }; + let metrics = Metrics { prometheus_address: metrics_prometheus_address, }; @@ -200,6 +205,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -218,6 +224,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -234,6 +241,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -254,6 +262,7 @@ impl Args { }) => { let server = Server::default(); let client = Client::default(); + let upgrade = Upgrade::default(); let media = Media::default(); let metrics = Metrics::default(); @@ -263,6 +272,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -284,6 +294,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -309,6 +320,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -333,6 +345,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -356,6 +369,7 @@ impl Args { Command::MigrateRepo(MigrateRepo { repo }) => { let server = Server::default(); let client = Client::default(); + let upgrade = Upgrade::default(); let media = Media::default(); let metrics = Metrics::default(); @@ -365,6 +379,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -383,6 +398,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -403,6 +419,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -421,6 +438,7 @@ impl Args { config_format: ConfigFormat { server, client, + upgrade, old_repo, tracing, metrics, @@ -470,6 +488,7 @@ pub(crate) enum Operation { pub(super) struct ConfigFormat { server: Server, client: Client, + upgrade: Upgrade, #[serde(skip_serializing_if = "Option::is_none")] old_repo: Option, tracing: Tracing, @@ -501,6 +520,13 @@ struct Client { timeout: Option, } +#[derive(Debug, Default, serde::Serialize)] +#[serde(rename_all = "snake_case")] +struct Upgrade { + #[serde(skip_serializing_if = "Option::is_none")] + concurrency: Option, +} + #[derive(Debug, Default, serde::Serialize)] #[serde(rename_all = "snake_case")] struct Tracing { @@ -870,6 +896,12 @@ struct Run { #[arg(long)] client_timeout: Option, + /// How many hashes pict-rs should try to migrate from 0.4 to 0.5 concurrently + /// + /// This number defaults to 32, but can be increased for better throughput + #[arg(long)] + upgrade_concurrency: Option, + /// Whether to enable the prometheus scrape endpoint #[arg(long)] metrics_prometheus_address: Option, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 21d3b71..12efde3 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -9,6 +9,7 @@ use std::{net::SocketAddr, path::PathBuf}; pub(crate) struct Defaults { server: ServerDefaults, client: ClientDefaults, + upgrade: UpgradeDefaults, tracing: TracingDefaults, media: MediaDefaults, repo: RepoDefaults, @@ -29,6 +30,12 @@ struct ClientDefaults { timeout: u64, } +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "snake_case")] +struct UpgradeDefaults { + concurrency: usize, +} + #[derive(Clone, Debug, Default, serde::Serialize)] #[serde(rename_all = "snake_case")] struct TracingDefaults { @@ -185,6 +192,12 @@ impl Default for ClientDefaults { } } +impl Default for UpgradeDefaults { + fn default() -> Self { + UpgradeDefaults { concurrency: 32 } + } +} + impl Default for LoggingDefaults { fn default() -> Self { LoggingDefaults { diff --git a/src/config/file.rs b/src/config/file.rs index 3bf7e41..8894100 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -13,6 +13,8 @@ pub(crate) struct ConfigFile { pub(crate) client: Client, + pub(crate) upgrade: Upgrade, + pub(crate) tracing: Tracing, #[serde(default)] @@ -119,6 +121,11 @@ pub(crate) struct Client { pub(crate) timeout: u64, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub(crate) struct Upgrade { + pub(crate) concurrency: usize, +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Tracing { diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 5eea7e2..4dba2cb 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -15,7 +15,6 @@ use crate::{ store::Store, }; -const MIGRATE_CONCURRENCY: usize = 32; const GENERATOR_KEY: &str = "last-path"; #[tracing::instrument(skip_all)] @@ -110,7 +109,7 @@ pub(crate) async fn migrate_04( tracing::warn!("Failed to read hash, skipping"); } - while set.len() >= MIGRATE_CONCURRENCY { + while set.len() >= config.upgrade.concurrency { if set.join_next().await.is_some() { index += 1;