diff --git a/Cargo.lock b/Cargo.lock index 2a18e9e..41c6cfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,29 +351,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "aws-creds" -version = "0.29.1" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "dirs", - "rust-ini", - "serde", - "serde-xml-rs", - "serde_derive", - "thiserror", - "url", -] - -[[package]] -name = "aws-region" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bdd1c0f4aa70f72812a2f3ec325d6d6162fb80cff093f847b4c394fd78c3643" -dependencies = [ - "thiserror", -] - [[package]] name = "axum" version = "0.5.16" @@ -720,26 +697,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "dlv-list" version = "0.3.0" @@ -1007,12 +964,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "hmac" version = "0.12.1" @@ -1092,19 +1043,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" -dependencies = [ - "http", - "hyper", - "rustls", - "tokio", - "tokio-rustls", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1162,12 +1100,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ipnet" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" - [[package]] name = "itertools" version = "0.10.5" @@ -1280,22 +1212,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" [[package]] -name = "maybe-async" -version = "0.2.6" +name = "md-5" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "proc-macro2", - "quote", - "syn", + "digest", ] -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.5.0" @@ -1649,8 +1573,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "pin-project-lite", - "reqwest", - "rust-s3", + "rusty-s3", "serde", "serde_cbor", "serde_json", @@ -1809,6 +1732,16 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.21" @@ -1857,17 +1790,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "redox_users" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" -dependencies = [ - "getrandom", - "redox_syscall", - "thiserror", -] - [[package]] name = "regex" version = "1.6.0" @@ -1903,46 +1825,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "reqwest" -version = "0.11.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "rustls", - "rustls-pemfile", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-rustls", - "tokio-util", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "webpki-roots", - "winreg", -] - [[package]] name = "ring" version = "0.16.20" @@ -1988,35 +1870,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rust-s3" -version = "0.31.0" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "async-trait", - "aws-creds", - "aws-region", - "base64", - "cfg-if", - "hex", - "hmac", - "http", - "log", - "maybe-async", - "md5", - "percent-encoding", - "reqwest", - "serde", - "serde-xml-rs", - "serde_derive", - "sha2", - "thiserror", - "time", - "tokio", - "tokio-stream", - "url", -] - [[package]] name = "rustc-demangle" version = "0.1.21" @@ -2045,12 +1898,22 @@ dependencies = [ ] [[package]] -name = "rustls-pemfile" -version = "1.0.1" +name = "rusty-s3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +checksum = "13332fd1e9538328a80183b9c0bde0cd7065ad2c4405f56b855a51a0a37fffd4" dependencies = [ "base64", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", ] [[package]] @@ -2089,25 +1952,13 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] -[[package]] -name = "serde-xml-rs" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa" -dependencies = [ - "log", - "serde", - "thiserror", - "xml-rs", -] - [[package]] name = "serde_cbor" version = "0.11.2" @@ -2120,9 +1971,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", @@ -2843,18 +2694,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.83" @@ -2998,21 +2837,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" -[[package]] -name = "winreg" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" -dependencies = [ - "winapi", -] - -[[package]] -name = "xml-rs" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" - [[package]] name = "yaml-rust" version = "0.4.5" @@ -3021,3 +2845,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" dependencies = [ "linked-hash-map", ] + +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" diff --git a/Cargo.toml b/Cargo.toml index 6364572..86e4ad5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,14 +39,7 @@ once_cell = "1.4.0" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" pin-project-lite = "0.2.7" -reqwest = { version = "0.11.5", default-features = false, features = [ - "rustls-tls", - "stream", -] } -rust-s3 = { version = "0.31.0", default-features = false, features = [ - "fail-on-err", - "with-reqwest", -], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } +rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" @@ -57,7 +50,10 @@ thiserror = "1.0" time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } -tokio-util = { version = "0.7", default-features = false, features = ["codec"] } +tokio-util = { version = "0.7", default-features = false, features = [ + "codec", + "io", +] } toml = "0.5.8" tracing = "0.1.15" tracing-error = "0.2.0" diff --git a/src/config/commandline.rs b/src/config/commandline.rs index b4aac13..0774d03 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -559,10 +559,6 @@ struct ObjectStorage { #[clap(short, long)] secret_key: Option, - /// The security token for accessing the bucket - #[clap(long)] - security_token: Option, - /// The session token for accessing the bucket #[clap(long)] session_token: Option, diff --git a/src/config/primitives.rs b/src/config/primitives.rs index f48027d..85aa354 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -79,11 +79,6 @@ pub(crate) struct ObjectStorage { #[clap(short, long)] pub(crate) secret_key: String, - /// The security token for accessing the bucket - #[clap(long)] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) security_token: Option, - /// The session token for accessing the bucket #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/main.rs b/src/main.rs index 8db94a1..5ab88c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -958,16 +958,10 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { fn build_client() -> awc::Client { Client::builder() .wrap(Tracing) - .add_default_header(("User-Agent", "pict-rs v0.3.0-main")) + .add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .finish() } -fn build_reqwest_client() -> reqwest::Result { - reqwest::Client::builder() - .user_agent("pict-rs v0.3.0-main") - .build() -} - fn next_worker_id() -> String { static WORKER_ID: AtomicU64 = AtomicU64::new(0); @@ -1092,22 +1086,24 @@ where } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let to = ObjectStore::build( + endpoint, bucket_name, - region.as_ref().clone(), + url_style, + region.as_ref(), Some(access_key.clone()), Some(secret_key.clone()), - security_token.clone(), session_token.clone(), repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; @@ -1136,22 +1132,24 @@ async fn main() -> color_eyre::Result<()> { migrate_inner(&repo, from, &to).await?; } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let from = ObjectStore::build( + endpoint, &bucket_name, + url_style, Serde::into_inner(region), Some(access_key), Some(secret_key), - security_token, session_token, repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; @@ -1173,22 +1171,24 @@ async fn main() -> color_eyre::Result<()> { } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let store = ObjectStore::build( + endpoint, &bucket_name, + url_style, Serde::into_inner(region), Some(access_key), Some(secret_key), - security_token, session_token, repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 30af737..e832eb3 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -3,16 +3,25 @@ use crate::{ repo::{Repo, SettingsRepo}, store::Store, }; -use actix_web::web::Bytes; -use futures_util::{Stream, TryStreamExt}; -use s3::{ - client::Client, command::Command, creds::Credentials, error::S3Error, request_trait::Request, - Bucket, Region, +use actix_web::{ + http::{ + header::{ByteRangeSpec, Range, CONTENT_LENGTH}, + StatusCode, + }, + web::Bytes, }; -use std::{pin::Pin, string::FromUtf8Error}; +use awc::{Client, ClientRequest}; +use futures_util::{Stream, TryStreamExt}; +use rusty_s3::{ + actions::{PutObject, S3Action}, + Bucket, Credentials, UrlStyle, +}; +use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::io::ReaderStream; use tracing::Instrument; +use url::Url; mod object_id; pub(crate) use object_id::ObjectId; @@ -33,8 +42,8 @@ pub(crate) enum ObjectError { #[error("Invalid length")] Length, - #[error("Storage error")] - Anyhow(#[from] S3Error), + #[error("Invalid status")] + Status(StatusCode), } #[derive(Clone)] @@ -42,7 +51,8 @@ pub(crate) struct ObjectStore { path_gen: Generator, repo: Repo, bucket: Bucket, - client: reqwest::Client, + credentials: Credentials, + client: Client, } #[async_trait::async_trait(?Send)] @@ -55,26 +65,30 @@ impl Store for ObjectStore { where Reader: AsyncRead + Unpin, { - let path = self.next_file().await?; + let response = self + .put_object_request() + .await? + .send_stream(ReaderStream::new(reader)) + .await?; - self.bucket - .put_object_stream(&self.client, reader, &path) - .await - .map_err(ObjectError::from)?; + if response.status().is_success() { + return Ok(ObjectId::from_string(path)); + } - Ok(ObjectId::from_string(path)) + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { - let path = self.next_file().await?; + let req = self.put_object_request().await?; - self.bucket - .put_object(&self.client, &path, &bytes) - .await - .map_err(ObjectError::from)?; + let response = req.send_body(bytes).await?; - Ok(ObjectId::from_string(path)) + if response.status().is_success() { + return Ok(ObjectId::from_string(path)); + } + + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument] @@ -84,38 +98,16 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result { - let path = identifier.as_str(); + let response = self + .get_object_request(identifier, from_start, len) + .send() + .await?; - let start = from_start.unwrap_or(0); - let end = len.map(|len| start + len - 1); + if response.status.is_success() { + return Ok(Box::pin(response)); + } - let request_span = tracing::trace_span!(parent: None, "Get Object"); - - // NOTE: isolating reqwest in it's own span is to prevent the request's span from getting - // smuggled into a long-lived task. Unfortunately, I am unable to create a minimal - // reproduction of this problem so I can't open a bug about it. - let request = request_span.in_scope(|| { - Client::request( - &self.client, - &self.bucket, - path, - Command::GetObjectRange { start, end }, - ) - }); - - let response = request_span - .in_scope(|| request.response()) - .instrument(request_span.clone()) - .await - .map_err(ObjectError::from)?; - - let stream = request_span.in_scope(|| { - response - .bytes_stream() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - }); - - Ok(Box::pin(stream)) + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument(skip(writer))] @@ -127,39 +119,52 @@ impl Store for ObjectStore { where Writer: AsyncWrite + Send + Unpin, { - let path = identifier.as_str(); + let response = self + .get_object_request(identifier, from_start, len) + .send() + .await?; - self.bucket - .get_object_stream(&self.client, path, writer) - .await - .map_err(ObjectError::from) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?; + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } + + while let Some(res) = response.next().await { + let bytes = res?; + writer.write_all_buf(bytes).await?; + } + writer.flush().await?; Ok(()) } #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let path = identifier.as_str(); + let response = self.head_object_request(identifier).send().await?; - let (head, _) = self - .bucket - .head_object(&self.client, path) - .await - .map_err(ObjectError::from)?; - let length = head.content_length.ok_or(ObjectError::Length)?; + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } - Ok(length as u64) + let length = response + .headers() + .get(CONTENT_LENGTH) + .ok_or(ObjectError::Length)? + .to_str() + .ok_or(ObjectError::Length) + .parse::() + .map_err(|_| ObjectError::Length)?; + + Ok(length) } #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let path = identifier.as_str(); + let response = self.delete_object_request(identifier).send().await?; + + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } - self.bucket - .delete_object(&self.client, path) - .await - .map_err(ObjectError::from)?; Ok(()) } } @@ -167,11 +172,12 @@ impl Store for ObjectStore { impl ObjectStore { #[allow(clippy::too_many_arguments)] pub(crate) async fn build( + endpoint: Url, bucket_name: &str, - region: Region, + url_style: UrlStyle, + region: &str, access_key: Option, secret_key: Option, - security_token: Option, session_token: Option, repo: Repo, client: reqwest::Client, @@ -181,28 +187,84 @@ impl ObjectStore { Ok(ObjectStore { path_gen, repo, - bucket: Bucket::new( - bucket_name, - match region { - Region::Custom { endpoint, .. } => Region::Custom { - region: String::from(""), - endpoint, - }, - region => region, - }, - Credentials { - access_key, - secret_key, - security_token, - session_token, - }, - ) - .map_err(ObjectError::from)? - .with_path_style(), + bucket: Bucket::new(endpoint, url_style, bucket_name, region) + .map_err(ObjectError::from)?, + credentials: Credentials::new_with_token(access_key, secret_key, session_token), client, }) } + async fn put_object_request(&self) -> Result { + let path = self.next_file().await?; + + let action = self.bucket.put_object(Some(&self.credentials), &path); + + Ok(self.build_request(action)) + } + + fn build_request(&self, action: A) -> ClientRequest { + let method = match A::METHOD { + rusty_s3::Method::Head => awc::http::Method::HEAD, + rusty_s3::Method::Get => awc::http::Method::GET, + rusty_s3::Method::Post => awc::http::Method::POST, + rusty_s3::Method::Put => awc::http::Method::PUT, + rusty_s3::Method::Delete => awc::http::Method::DELETE, + }; + + let url = action.sign(Duration::from_secs(5)); + + let req = self.client.request(method, url); + + action + .headers_mut() + .drain() + .fold(req, |req, tup| req.insert_header(tup)) + } + + fn get_object_request( + &self, + identifier: &ObjectId, + from_start: Option, + len: Option, + ) -> ClientRequest { + let action = self + .bucket + .get_object(Some(&self.credentials), identifier.as_str()); + + let req = self.build_request(action); + + let start = from_start.unwrap_or(0); + let end = len.map(|len| start + len - 1); + + let range = match (start, end) { + (Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)), + (Some(start), None) => Some(ByteRangeSpec::From(start)), + _ => None, + }; + + if let Some(range) = range { + req.insert_header(Range::Bytes(vec![range])).send().await?; + } else { + req + } + } + + fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .head_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + + fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .delete_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); @@ -243,8 +305,8 @@ impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectStore") .field("path_gen", &"generator") - .field("bucket", &self.bucket.name) - .field("region", &self.bucket.region) + .field("bucket", &self.bucket.name()) + .field("region", &self.bucket.region()) .finish() } }