From 691bca286cffb077a8082adec89ea64528c14386 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 11 Jul 2023 13:01:58 -0500 Subject: [PATCH] Make object storage timeouts configurable --- src/config.rs | 9 +++---- src/config/commandline.rs | 27 +++++++++++++++++++ src/config/defaults.rs | 35 ++++++++++++++++++++++--- src/config/file.rs | 55 ++++++++++++++++++++++++++++++++++++++- src/config/primitives.rs | 16 ++++++++++++ src/lib.rs | 22 ++++++++++++---- src/store/object_store.rs | 17 ++++++++++-- 7 files changed, 164 insertions(+), 17 deletions(-) diff --git a/src/config.rs b/src/config.rs index d2a9c06..2c4279e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,7 +4,7 @@ use std::path::{Path, PathBuf}; mod commandline; mod defaults; mod file; -mod primitives; +pub mod primitives; use commandline::{Args, Output}; use config::Config; @@ -12,11 +12,10 @@ use defaults::Defaults; pub(crate) use commandline::Operation; pub(crate) use file::{ - ConfigFile as Configuration, Media as MediaConfiguration, OpenTelemetry, Repo, Sled, Tracing, -}; -pub(crate) use primitives::{ - AudioCodec, Filesystem, ImageFormat, LogFormat, ObjectStorage, Store, VideoCodec, + ConfigFile as Configuration, Media as MediaConfiguration, ObjectStorage, OpenTelemetry, Repo, + Sled, Store, Tracing, }; +pub(crate) use primitives::{AudioCodec, Filesystem, ImageFormat, LogFormat, VideoCodec}; /// Source for pict-rs configuration when embedding as a library pub enum ConfigSource { diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 83e7d7a..7d50ec0 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -519,6 +519,8 @@ struct Run { #[derive(Clone, Debug, Subcommand, serde::Serialize)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] enum Store { /// configure filesystem storage Filesystem(Filesystem), @@ -529,6 +531,8 @@ enum Store { /// Run pict-rs with the provided storage #[derive(Debug, Subcommand)] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] enum RunStore { /// Run pict-rs with filesystem storage Filesystem(RunFilesystem), @@ -550,6 +554,8 @@ struct MigrateStore { /// Configure the pict-rs storage migration #[derive(Debug, Subcommand)] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] enum MigrateStoreFrom { /// Migrate from the provided filesystem storage Filesystem(MigrateFilesystem), @@ -560,6 +566,8 @@ enum MigrateStoreFrom { /// Configure the destination storage for pict-rs storage migration #[derive(Debug, Subcommand)] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] enum MigrateStoreTo { /// Migrate to the provided filesystem storage Filesystem(MigrateFilesystemInner), @@ -667,25 +675,44 @@ struct ObjectStorage { /// The bucket in which to store media #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] bucket_name: Option, /// The region the bucket is located in /// /// For minio deployments, this can just be 'minio' #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] region: Option, /// The Access Key for the user accessing the bucket #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] access_key: Option, /// The secret key for the user accessing the bucket #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] secret_key: Option, /// The session token for accessing the bucket #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] session_token: Option, + + /// How long signatures for object storage requests are valid (in seconds) + /// + /// This defaults to 15 seconds + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + signature_duration: Option, + + /// How long a client can wait on an object storage request before giving up (in seconds) + /// + /// This defaults to 30 seconds + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + client_timeout: Option, } /// Configuration for the sled-backed data repository diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 46481f0..0ba993b 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -101,9 +101,15 @@ struct SledDefaults { #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "snake_case")] -#[serde(tag = "type")] -pub(super) enum StoreDefaults { - Filesystem(FilesystemDefaults), +pub(super) struct StoreDefaults { + #[serde(rename = "type")] + type_: String, + + #[serde(flatten)] + pub(super) filesystem: FilesystemDefaults, + + #[serde(flatten)] + pub(super) object_storage: ObjectStorageDefaults, } #[derive(Clone, Debug, serde::Serialize)] @@ -112,6 +118,14 @@ pub(super) struct FilesystemDefaults { path: PathBuf, } +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) struct ObjectStorageDefaults { + signature_duration: u64, + + client_timeout: u64, +} + impl Default for ServerDefaults { fn default() -> Self { ServerDefaults { @@ -211,7 +225,11 @@ impl Default for SledDefaults { impl Default for StoreDefaults { fn default() -> Self { - Self::Filesystem(FilesystemDefaults::default()) + Self { + type_: String::from("filesystem"), + filesystem: FilesystemDefaults::default(), + object_storage: ObjectStorageDefaults::default(), + } } } @@ -223,6 +241,15 @@ impl Default for FilesystemDefaults { } } +impl Default for ObjectStorageDefaults { + fn default() -> Self { + Self { + signature_duration: 15, + client_timeout: 30, + } + } +} + impl From for crate::config::primitives::Filesystem { fn from(value: crate::config::commandline::Filesystem) -> Self { Self { diff --git a/src/config/file.rs b/src/config/file.rs index 91a3a6a..390d3ca 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -1,5 +1,5 @@ use crate::{ - config::primitives::{AudioCodec, ImageFormat, LogFormat, Store, Targets, VideoCodec}, + config::primitives::{AudioCodec, Filesystem, ImageFormat, LogFormat, Targets, VideoCodec}, serde_str::Serde, }; use once_cell::sync::OnceCell; @@ -22,6 +22,59 @@ pub(crate) struct ConfigFile { pub(crate) store: Store, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type")] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] +pub(crate) enum Store { + Filesystem(Filesystem), + ObjectStorage(ObjectStorage), +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + pub(crate) endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + pub(crate) use_path_style: bool, + + /// The bucket in which to store media + pub(crate) bucket_name: String, + + /// The region the bucket is located in + pub(crate) region: String, + + /// The Access Key for the user accessing the bucket + pub(crate) access_key: String, + + /// The secret key for the user accessing the bucket + pub(crate) secret_key: String, + + /// The session token for accessing the bucket + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) session_token: Option, + + /// How long signatures for object storage requests are valid (in seconds) + /// + /// This defaults to 15 seconds + pub(crate) signature_duration: u64, + + /// How long a client can wait on an object storage request before giving up (in seconds) + /// + /// This defaults to 30 seconds + pub(crate) client_timeout: u64, +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] diff --git a/src/config/primitives.rs b/src/config/primitives.rs index dbc9bf5..0bb3e4c 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -142,11 +142,27 @@ pub(crate) struct ObjectStorage { #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub(crate) session_token: Option, + + /// How long signatures for object storage requests are valid (in seconds) + /// + /// This defaults to 15 seconds + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) signature_duration: Option, + + /// How long a client can wait on an object storage request before giving up (in seconds) + /// + /// This defaults to 30 seconds + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) client_timeout: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] +// allow large enum variant - this is an instantiated-once config +#[allow(clippy::large_enum_variant)] pub(crate) enum Store { Filesystem(Filesystem), diff --git a/src/lib.rs b/src/lib.rs index 8f1e269..26c03b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1266,21 +1266,21 @@ async fn migrate_inner( repo: &Repo, client: Client, from: S1, - to: config::Store, + to: config::primitives::Store, skip_missing_files: bool, ) -> color_eyre::Result<()> where S1: Store, { match to { - config::Store::Filesystem(config::Filesystem { path }) => { + config::primitives::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; match repo { Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, } } - config::Store::ObjectStorage(config::ObjectStorage { + config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, bucket_name, use_path_style, @@ -1288,6 +1288,8 @@ where access_key, secret_key, session_token, + signature_duration, + client_timeout, }) => { let to = ObjectStore::build( endpoint.clone(), @@ -1301,6 +1303,8 @@ where access_key, secret_key, session_token, + signature_duration.unwrap_or(15), + client_timeout.unwrap_or(30), repo.clone(), ) .await? @@ -1399,11 +1403,11 @@ pub async fn run() -> color_eyre::Result<()> { let client = build_client(); match from { - config::Store::Filesystem(config::Filesystem { path }) => { + config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; migrate_inner(&repo, client, from, to, skip_missing_files).await?; } - config::Store::ObjectStorage(config::ObjectStorage { + config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, bucket_name, use_path_style, @@ -1411,6 +1415,8 @@ pub async fn run() -> color_eyre::Result<()> { access_key, secret_key, session_token, + signature_duration, + client_timeout, }) => { let from = ObjectStore::build( endpoint, @@ -1424,6 +1430,8 @@ pub async fn run() -> color_eyre::Result<()> { access_key, secret_key, session_token, + signature_duration.unwrap_or(15), + client_timeout.unwrap_or(30), repo.clone(), ) .await? @@ -1460,6 +1468,8 @@ pub async fn run() -> color_eyre::Result<()> { access_key, secret_key, session_token, + signature_duration, + client_timeout, }) => { let store = ObjectStore::build( endpoint, @@ -1473,6 +1483,8 @@ pub async fn run() -> color_eyre::Result<()> { access_key, secret_key, session_token, + signature_duration, + client_timeout, repo.clone(), ) .await?; diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 1361edd..bf4c686 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -96,6 +96,8 @@ pub(crate) struct ObjectStore { bucket: Bucket, credentials: Credentials, client: Client, + signature_expiration: Duration, + client_timeout: Duration, } #[derive(Clone)] @@ -104,6 +106,8 @@ pub(crate) struct ObjectStoreConfig { repo: Repo, bucket: Bucket, credentials: Credentials, + signature_expiration: u64, + client_timeout: u64, } #[derive(serde::Deserialize, Debug)] @@ -124,6 +128,8 @@ impl ObjectStoreConfig { bucket: self.bucket, credentials: self.credentials, client, + signature_expiration: Duration::from_secs(self.signature_expiration), + client_timeout: Duration::from_secs(self.client_timeout), } } } @@ -437,6 +443,8 @@ impl ObjectStore { access_key: String, secret_key: String, session_token: Option, + signature_expiration: u64, + client_timeout: u64, repo: Repo, ) -> Result { let path_gen = init_generator(&repo).await?; @@ -451,6 +459,8 @@ impl ObjectStore { } else { Credentials::new(access_key, secret_key) }, + signature_expiration, + client_timeout, }) } @@ -580,9 +590,12 @@ impl ObjectStore { rusty_s3::Method::Delete => awc::http::Method::DELETE, }; - let url = action.sign(Duration::from_secs(15)); + let url = action.sign(self.signature_expiration); - let req = self.client.request(method, url.as_str()); + let req = self + .client + .request(method, url.as_str()) + .timeout(self.client_timeout); let req = action .headers_mut()