From ff1771e016e7febb0bc1e15719ca00fbe2783c20 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 14:18:49 -0500 Subject: [PATCH] More migration work for rusty-s3 --- src/config/commandline.rs | 19 +++++++++++- src/config/primitives.rs | 22 ++++++++++++-- src/main.rs | 50 ++++++++++++++++++++----------- src/store.rs | 5 +++- src/store/file_store.rs | 5 ++++ src/store/object_store.rs | 63 +++++++++++++++++++++++---------------- 6 files changed, 116 insertions(+), 48 deletions(-) diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 0774d03..69d98c4 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -543,13 +543,30 @@ struct Filesystem { #[derive(Clone, Debug, Parser, serde::Serialize)] #[serde(rename_all = "snake_case")] struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] bucket_name: Option, /// The region the bucket is located in + /// + /// For minio deployments, this can just be 'minio' #[clap(short, long)] - region: Option>, + region: Option, /// The Access Key for the user accessing the bucket #[clap(short, long)] diff --git a/src/config/primitives.rs b/src/config/primitives.rs index 85aa354..1382499 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -1,8 +1,8 @@ use crate::magick::ValidInputType; -use crate::serde_str::Serde; use clap::ArgEnum; use std::{fmt::Display, path::PathBuf, str::FromStr}; use tracing::Level; +use url::Url; #[derive( Clone, @@ -63,13 +63,28 @@ pub(crate) struct Filesystem { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, clap::Parser)] #[serde(rename_all = "snake_case")] pub(crate) struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + pub(crate) endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + pub(crate) use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] pub(crate) bucket_name: String, /// The region the bucket is located in #[clap(short, long)] - pub(crate) region: Serde, + pub(crate) region: String, /// The Access Key for the user accessing the bucket #[clap(short, long)] @@ -219,7 +234,8 @@ impl Display for LogFormat { #[cfg(test)] mod tests { - use super::{Serde, Targets}; + use super::Targets; + use crate::serde_str::Serde; #[test] fn builds_info_targets() { diff --git a/src/main.rs b/src/main.rs index 5ab88c2..e8d44b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use futures_util::{ Stream, StreamExt, TryStreamExt, }; use once_cell::sync::Lazy; +use rusty_s3::UrlStyle; use std::{ future::ready, path::PathBuf, @@ -970,15 +971,15 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +async fn launch( repo: R, - store: S, + store: S::Config, ) -> color_eyre::Result<()> { repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; HttpServer::new(move || { - let store = store.clone(); + let store = S::init(store.clone()); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1081,6 +1082,8 @@ where match to { config::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; + let to = FileStore::init(to); + match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, } @@ -1088,25 +1091,30 @@ where config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let to = ObjectStore::build( - endpoint, + endpoint.clone(), bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, region.as_ref(), Some(access_key.clone()), Some(secret_key.clone()), session_token.clone(), repo.clone(), - build_client(), ) .await?; + let to = ObjectStore::init(to); + match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, } @@ -1129,29 +1137,34 @@ async fn main() -> color_eyre::Result<()> { match from { config::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; + let from = FileStore::init(from); migrate_inner(&repo, from, &to).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let from = ObjectStore::build( - endpoint, + endpoint.clone(), &bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, Serde::into_inner(region), Some(access_key), Some(secret_key), session_token, repo.clone(), - build_client(), ) .await?; + let from = ObjectStore::init(from); migrate_inner(&repo, from, &to).await?; } @@ -1167,33 +1180,36 @@ async fn main() -> color_eyre::Result<()> { let store = FileStore::build(path, repo.clone()).await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await, } } config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let store = ObjectStore::build( - endpoint, + endpoint.clone(), &bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, Serde::into_inner(region), Some(access_key), Some(secret_key), session_token, repo.clone(), - build_client(), ) .await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await, } } } diff --git a/src/store.rs b/src/store.rs index 1df8834..bfa4c66 100644 --- a/src/store.rs +++ b/src/store.rs @@ -16,10 +16,13 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { } #[async_trait::async_trait(?Send)] -pub(crate) trait Store: Send + Sync + Clone + Debug { +pub(crate) trait Store: Clone + Debug { + type Config: Send + Sync + Clone; type Identifier: Identifier + 'static; type Stream: Stream> + 'static; + fn init(config: Self::Config) -> Self; + async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index a93dee8..6d11ff6 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -49,9 +49,14 @@ pub(crate) struct FileStore { #[async_trait::async_trait(?Send)] impl Store for FileStore { + type Config = Self; type Identifier = FileId; type Stream = Pin>>>; + fn init(config: Self::Config) -> Self { + config + } + #[tracing::instrument(skip(reader))] async fn save_async_read(&self, reader: &mut Reader) -> Result where diff --git a/src/store/object_store.rs b/src/store/object_store.rs index e832eb3..a7e7888 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -12,15 +12,11 @@ use actix_web::{ }; use awc::{Client, ClientRequest}; use futures_util::{Stream, TryStreamExt}; -use rusty_s3::{ - actions::{PutObject, S3Action}, - Bucket, Credentials, UrlStyle, -}; +use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::io::ReaderStream; -use tracing::Instrument; use url::Url; mod object_id; @@ -55,24 +51,41 @@ pub(crate) struct ObjectStore { client: Client, } +#[derive(Clone)] +pub(crate) struct ObjectStoreConfig { + path_gen: Generator, + repo: Repo, + bucket: Bucket, + credentials: Credentials, +} + #[async_trait::async_trait(?Send)] impl Store for ObjectStore { + type Config = ObjectStoreConfig; type Identifier = ObjectId; type Stream = Pin>>>; + fn init(config: Self::Config) -> Self { + ObjectStore { + path_gen: config.path_gen, + repo: config.repo, + bucket: config.bucket, + credentials: config.credentials, + client: crate::build_client(), + } + } + #[tracing::instrument(skip(reader))] async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin, { - let response = self - .put_object_request() - .await? - .send_stream(ReaderStream::new(reader)) - .await?; + let (req, object_id) = self.put_object_request().await?; + + let response = req.send_stream(ReaderStream::new(reader)).await?; if response.status().is_success() { - return Ok(ObjectId::from_string(path)); + return Ok(object_id); } Err(ObjectError::Status(response.status()).into()) @@ -80,12 +93,12 @@ impl Store for ObjectStore { #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { - let req = self.put_object_request().await?; + let (req, object_id) = self.put_object_request().await?; let response = req.send_body(bytes).await?; if response.status().is_success() { - return Ok(ObjectId::from_string(path)); + return Ok(object_id); } Err(ObjectError::Status(response.status()).into()) @@ -103,7 +116,7 @@ impl Store for ObjectStore { .send() .await?; - if response.status.is_success() { + if response.status().is_success() { return Ok(Box::pin(response)); } @@ -120,11 +133,11 @@ impl Store for ObjectStore { Writer: AsyncWrite + Send + Unpin, { let response = self - .get_object_request(identifier, from_start, len) + .get_object_request(identifier, None, None) .send() .await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -141,7 +154,7 @@ impl Store for ObjectStore { async fn len(&self, identifier: &Self::Identifier) -> Result { let response = self.head_object_request(identifier).send().await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -161,7 +174,7 @@ impl Store for ObjectStore { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { let response = self.delete_object_request(identifier).send().await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -180,29 +193,27 @@ impl ObjectStore { secret_key: Option, session_token: Option, repo: Repo, - client: reqwest::Client, - ) -> Result { + ) -> Result { let path_gen = init_generator(&repo).await?; - Ok(ObjectStore { + Ok(ObjectStoreConfig { path_gen, repo, bucket: Bucket::new(endpoint, url_style, bucket_name, region) .map_err(ObjectError::from)?, credentials: Credentials::new_with_token(access_key, secret_key, session_token), - client, }) } - async fn put_object_request(&self) -> Result { + async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { let path = self.next_file().await?; let action = self.bucket.put_object(Some(&self.credentials), &path); - Ok(self.build_request(action)) + Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request(&self, action: A) -> ClientRequest { + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -243,7 +254,7 @@ impl ObjectStore { }; if let Some(range) = range { - req.insert_header(Range::Bytes(vec![range])).send().await?; + req.insert_header(Range::Bytes(vec![range])) } else { req }