diff --git a/Cargo.lock b/Cargo.lock index 2a18e9e..aa85935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,7 +22,8 @@ dependencies = [ [[package]] name = "actix-form-data" version = "0.7.0-beta.0" -source = "git+https://git.asonix.dog/asonix/actix-form-data?branch=v0.7.x#3525bcd09cd030df3f2ed7684f2aad1bcc42d68b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e721f3919cb43c566c0dbb6a9cb5ad5106ac42b6b3c0d21a7a3e762455de957a" dependencies = [ "actix-multipart", "actix-rt", @@ -351,29 +352,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "aws-creds" -version = "0.29.1" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "dirs", - "rust-ini", - "serde", - "serde-xml-rs", - "serde_derive", - "thiserror", - "url", -] - -[[package]] -name = "aws-region" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bdd1c0f4aa70f72812a2f3ec325d6d6162fb80cff093f847b4c394fd78c3643" -dependencies = [ - "thiserror", -] - [[package]] name = "axum" version = "0.5.16" @@ -720,26 +698,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "dlv-list" version = "0.3.0" @@ -1007,12 +965,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "hmac" version = "0.12.1" @@ -1092,19 +1044,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" -dependencies = [ - "http", - "hyper", - "rustls", - "tokio", - "tokio-rustls", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1162,12 +1101,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ipnet" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" - [[package]] name = "itertools" version = "0.10.5" @@ -1280,22 +1213,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" [[package]] -name = "maybe-async" -version = "0.2.6" +name = "md-5" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "proc-macro2", - "quote", - "syn", + "digest", ] -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.5.0" @@ -1643,14 +1568,15 @@ dependencies = [ "console-subscriber", "dashmap", "futures-util", + "md-5", "mime", "num_cpus", "once_cell", "opentelemetry", "opentelemetry-otlp", "pin-project-lite", - "reqwest", - "rust-s3", + "quick-xml", + "rusty-s3", "serde", "serde_cbor", "serde_json", @@ -1749,9 +1675,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" dependencies = [ "unicode-ident", ] @@ -1809,6 +1735,16 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.21" @@ -1857,17 +1793,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "redox_users" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" -dependencies = [ - "getrandom", - "redox_syscall", - "thiserror", -] - [[package]] name = "regex" version = "1.6.0" @@ -1903,46 +1828,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "reqwest" -version = "0.11.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "rustls", - "rustls-pemfile", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-rustls", - "tokio-util", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "webpki-roots", - "winreg", -] - [[package]] name = "ring" version = "0.16.20" @@ -1988,35 +1873,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rust-s3" -version = "0.31.0" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "async-trait", - "aws-creds", - "aws-region", - "base64", - "cfg-if", - "hex", - "hmac", - "http", - "log", - "maybe-async", - "md5", - "percent-encoding", - "reqwest", - "serde", - "serde-xml-rs", - "serde_derive", - "sha2", - "thiserror", - "time", - "tokio", - "tokio-stream", - "url", -] - [[package]] name = "rustc-demangle" version = "0.1.21" @@ -2045,12 +1901,22 @@ dependencies = [ ] [[package]] -name = "rustls-pemfile" -version = "1.0.1" +name = "rusty-s3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +checksum = "13332fd1e9538328a80183b9c0bde0cd7065ad2c4405f56b855a51a0a37fffd4" dependencies = [ "base64", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", ] [[package]] @@ -2089,25 +1955,13 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] -[[package]] -name = "serde-xml-rs" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa" -dependencies = [ - "log", - "serde", - "thiserror", - "xml-rs", -] - [[package]] name = "serde_cbor" version = "0.11.2" @@ -2120,9 +1974,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", @@ -2843,18 +2697,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.83" @@ -2998,21 +2840,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" -[[package]] -name = "winreg" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" -dependencies = [ - "winapi", -] - -[[package]] -name = "xml-rs" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" - [[package]] name = "yaml-rust" version = "0.4.5" @@ -3021,3 +2848,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" dependencies = [ "linked-hash-map", ] + +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" diff --git a/Cargo.toml b/Cargo.toml index 6364572..b747da4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ io-uring = [ ] [dependencies] -actix-form-data = { version = "0.7.0-beta.0", git = "https://git.asonix.dog/asonix/actix-form-data", branch = "v0.7.x" } +actix-form-data = "0.7.0-beta.0" actix-rt = { version = "2.7.0", default-features = false } actix-server = "2.0.0" actix-web = { version = "4.0.0", default-features = false } @@ -33,23 +33,18 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" futures-util = "0.3.17" +md-5 = "0.10.5" mime = "0.3.1" num_cpus = "1.13" once_cell = "1.4.0" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" pin-project-lite = "0.2.7" -reqwest = { version = "0.11.5", default-features = false, features = [ - "rustls-tls", - "stream", -] } -rust-s3 = { version = "0.31.0", default-features = false, features = [ - "fail-on-err", - "with-reqwest", -], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } +rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" +quick-xml = { version = "0.24.1", features = ["serialize"] } sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" @@ -57,7 +52,10 @@ thiserror = "1.0" time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } -tokio-util = { version = "0.7", default-features = false, features = ["codec"] } +tokio-util = { version = "0.7", default-features = false, features = [ + "codec", + "io", +] } toml = "0.5.8" tracing = "0.1.15" tracing-error = "0.2.0" diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index 9f1f905..ac82284 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -14,8 +14,6 @@ services: environment: - PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669 - PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137 - links: - - "minio:pict-rs.minio" stdin_open: true tty: true volumes: @@ -23,7 +21,7 @@ services: - ../../:/opt/app pictrs_proxy: - image: asonix/pictrs-proxy:0.3 + image: asonix/pictrs-proxy:0.4.0-alpha.4 ports: - "8081:8081" environment: diff --git a/docker/object-storage/otel.yml b/docker/object-storage/otel.yml index 8270b08..91168c3 100644 --- a/docker/object-storage/otel.yml +++ b/docker/object-storage/otel.yml @@ -11,7 +11,8 @@ exporters: logging: jaeger: endpoint: jaeger:14250 - insecure: true + tls: + insecure: true service: pipelines: diff --git a/docker/object-storage/pict-rs.toml b/docker/object-storage/pict-rs.toml index f710493..426f3fb 100644 --- a/docker/object-storage/pict-rs.toml +++ b/docker/object-storage/pict-rs.toml @@ -1,6 +1,7 @@ [server] address = '0.0.0.0:8080' worker_id = 'pict-rs-1' + [tracing.logging] format = 'normal' targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' @@ -21,13 +22,7 @@ max_height = 10000 max_area = 40000000 max_file_size = 40 enable_silent_video = true -filters = [ - 'blur', - 'crop', - 'identity', - 'resize', - 'thumbnail', -] +filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] skip_validate_imports = false [repo] @@ -37,7 +32,9 @@ cache_capacity = 67108864 [store] type = 'object_storage' +endpoint = 'http://minio:9000' +use_path_style = true bucket_name = 'pict-rs' -region = 'http://minio:9000' -access_key = 'Q7Z3AY3JO01N27UNH5IR' -secret_key = 'bH3Kj6UVJF+btBtWsExVurN3ukEilC3saECsupzi' +region = 'minio' +access_key = 'pictrs' +secret_key = 'pictrspass' diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 73eb5e1..4049fbe 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -5,7 +5,6 @@ use crate::{ }; use actix_web::web::Bytes; use futures_util::{Stream, TryStreamExt}; -use tokio_util::io::StreamReader; use tracing::{Instrument, Span}; pub(crate) struct Backgrounded @@ -38,7 +37,7 @@ where pub(crate) async fn proxy

(repo: R, store: S, stream: P) -> Result where - P: Stream>, + P: Stream> + Unpin + 'static, { let mut this = Self { repo, @@ -53,14 +52,13 @@ where async fn do_proxy

(&mut self, store: S, stream: P) -> Result<(), Error> where - P: Stream>, + P: Stream> + Unpin + 'static, { UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?; let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - let mut reader = StreamReader::new(Box::pin(stream)); - let identifier = store.save_async_read(&mut reader).await?; + let identifier = store.save_stream(stream).await?; self.identifier = Some(identifier); diff --git a/src/config/commandline.rs b/src/config/commandline.rs index b4aac13..69d98c4 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -543,13 +543,30 @@ struct Filesystem { #[derive(Clone, Debug, Parser, serde::Serialize)] #[serde(rename_all = "snake_case")] struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] bucket_name: Option, /// The region the bucket is located in + /// + /// For minio deployments, this can just be 'minio' #[clap(short, long)] - region: Option>, + region: Option, /// The Access Key for the user accessing the bucket #[clap(short, long)] @@ -559,10 +576,6 @@ struct ObjectStorage { #[clap(short, long)] secret_key: Option, - /// The security token for accessing the bucket - #[clap(long)] - security_token: Option, - /// The session token for accessing the bucket #[clap(long)] session_token: Option, diff --git a/src/config/primitives.rs b/src/config/primitives.rs index f48027d..1382499 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -1,8 +1,8 @@ use crate::magick::ValidInputType; -use crate::serde_str::Serde; use clap::ArgEnum; use std::{fmt::Display, path::PathBuf, str::FromStr}; use tracing::Level; +use url::Url; #[derive( Clone, @@ -63,13 +63,28 @@ pub(crate) struct Filesystem { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, clap::Parser)] #[serde(rename_all = "snake_case")] pub(crate) struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + pub(crate) endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + pub(crate) use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] pub(crate) bucket_name: String, /// The region the bucket is located in #[clap(short, long)] - pub(crate) region: Serde, + pub(crate) region: String, /// The Access Key for the user accessing the bucket #[clap(short, long)] @@ -79,11 +94,6 @@ pub(crate) struct ObjectStorage { #[clap(short, long)] pub(crate) secret_key: String, - /// The security token for accessing the bucket - #[clap(long)] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) security_token: Option, - /// The session token for accessing the bucket #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] @@ -224,7 +234,8 @@ impl Display for LogFormat { #[cfg(test)] mod tests { - use super::{Serde, Targets}; + use super::Targets; + use crate::serde_str::Serde; #[test] fn builds_info_targets() { diff --git a/src/generate.rs b/src/generate.rs index 596a5a0..ae7ecb9 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -57,14 +57,14 @@ async fn process( identifier } else { let identifier = repo.identifier(hash.clone()).await?; - let mut reader = crate::ffmpeg::thumbnail( + let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, InputFormat::Mp4, ThumbnailFormat::Jpeg, ) .await?; - let motion_identifier = store.save_async_read(&mut reader).await?; + let motion_identifier = store.save_async_read(reader).await?; repo.relate_motion_identifier(hash.clone(), &motion_identifier) .await?; diff --git a/src/ingest.rs b/src/ingest.rs index f4c49c7..e3a3fc6 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -55,7 +55,7 @@ where pub(crate) async fn ingest( repo: &R, store: &S, - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, is_cached: bool, @@ -77,9 +77,10 @@ where ) .await?; - let mut hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher = hasher_reader.hasher(); - let identifier = store.save_async_read(&mut hasher_reader).await?; + let identifier = store.save_async_read(hasher_reader).await?; drop(permit); @@ -90,7 +91,7 @@ where identifier: Some(identifier.clone()), }; - let hash = hasher_reader.finalize_reset().await?; + let hash = hasher.borrow_mut().finalize_reset().to_vec(); session.hash = Some(hash.clone()); diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 71708ef..811600b 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -1,8 +1,8 @@ -use crate::error::Error; -use actix_web::web; use sha2::{digest::FixedOutputReset, Digest}; use std::{ + cell::RefCell, pin::Pin, + rc::Rc, task::{Context, Poll}, }; use tokio::io::{AsyncRead, ReadBuf}; @@ -12,7 +12,7 @@ pin_project_lite::pin_project! { #[pin] inner: I, - hasher: D, + hasher: Rc>, } } @@ -23,14 +23,12 @@ where pub(super) fn new(reader: I, digest: D) -> Self { Hasher { inner: reader, - hasher: digest, + hasher: Rc::new(RefCell::new(digest)), } } - pub(super) async fn finalize_reset(self) -> Result, Error> { - let mut hasher = self.hasher; - let hash = web::block(move || hasher.finalize_reset().to_vec()).await?; - Ok(hash) + pub(super) fn hasher(&self) -> Rc> { + Rc::clone(&self.hasher) } } @@ -53,7 +51,9 @@ where let poll_res = reader.poll_read(cx, buf); let after_len = buf.filled().len(); if after_len > before_len { - hasher.update(&buf.filled()[before_len..after_len]); + hasher + .borrow_mut() + .update(&buf.filled()[before_len..after_len]); } poll_res } @@ -89,11 +89,11 @@ mod test { let hash = test_on_arbiter!(async move { let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; - let mut hasher = Hasher::new(file1, Sha256::new()); + let mut reader = Hasher::new(file1, Sha256::new()); - tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; - hasher.finalize_reset().await + Ok(reader.hasher().borrow_mut().finalize_reset().to_vec()) as std::io::Result<_> }) .unwrap(); diff --git a/src/main.rs b/src/main.rs index 8db94a1..2d83b9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use futures_util::{ Stream, StreamExt, TryStreamExt, }; use once_cell::sync::Lazy; +use rusty_s3::UrlStyle; use std::{ future::ready, path::PathBuf, @@ -65,7 +66,11 @@ use self::{ UploadResult, }, serde_str::Serde, - store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, + store::{ + file_store::FileStore, + object_store::{ObjectStore, ObjectStoreConfig}, + Identifier, Store, StoreConfig, + }, stream::{StreamLimit, StreamTimeout}, }; @@ -448,7 +453,7 @@ async fn download( #[instrument(name = "Downloading file inline", skip(stream))] async fn do_download_inline( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -474,7 +479,7 @@ async fn do_download_inline( #[instrument(name = "Downloading file in background", skip(stream))] async fn do_download_backgrounded( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -958,16 +963,10 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { fn build_client() -> awc::Client { Client::builder() .wrap(Tracing) - .add_default_header(("User-Agent", "pict-rs v0.3.0-main")) + .add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .finish() } -fn build_reqwest_client() -> reqwest::Result { - reqwest::Client::builder() - .user_agent("pict-rs v0.3.0-main") - .build() -} - fn next_worker_id() -> String { static WORKER_ID: AtomicU64 = AtomicU64::new(0); @@ -976,15 +975,15 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +async fn launch( repo: R, - store: S, + store_config: SC, ) -> color_eyre::Result<()> { repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; HttpServer::new(move || { - let store = store.clone(); + let store = store_config.clone().build(); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1013,20 +1012,23 @@ async fn launch( .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) .service( web::scope("/backgrounded") .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload_backgrounded::)), + .route(web::post().to(upload_backgrounded::)), ) .service( - web::resource("/claim").route(web::get().to(claim_upload::)), + web::resource("/claim") + .route(web::get().to(claim_upload::)), ), ) - .service(web::resource("/download").route(web::get().to(download::))) + .service( + web::resource("/download").route(web::get().to(download::)), + ) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) @@ -1034,27 +1036,27 @@ async fn launch( ) .service( web::resource("/original/{filename}") - .route(web::get().to(serve::)) - .route(web::head().to(serve_head::)), + .route(web::get().to(serve::)) + .route(web::head().to(serve_head::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process::)) - .route(web::head().to(process_head::)), + .route(web::get().to(process::)) + .route(web::head().to(process_head::)), ) .service( web::resource("/process_backgrounded.{ext}") - .route(web::get().to(process_backgrounded::)), + .route(web::get().to(process_backgrounded::)), ) .service( web::scope("/details") .service( web::resource("/original/{filename}") - .route(web::get().to(details::)), + .route(web::get().to(details::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details::)), + .route(web::get().to(process_details::)), ), ), ) @@ -1063,7 +1065,7 @@ async fn launch( .wrap(Internal( CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), )) - .service(web::resource("/import").route(web::post().to(import::))) + .service(web::resource("/import").route(web::post().to(import::))) .service( web::resource("/variants").route(web::delete().to(clean_variants::)), ) @@ -1080,36 +1082,43 @@ async fn launch( Ok(()) } -async fn migrate_inner(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()> +async fn migrate_inner(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()> where S1: Store, { match to { config::Store::Filesystem(config::Filesystem { path }) => { - let to = FileStore::build(path.clone(), repo.clone()).await?; + let to = FileStore::build(path.clone(), repo.clone()).await?.build(); + match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + use_path_style, region, access_key, secret_key, - security_token, session_token, }) => { let to = ObjectStore::build( + endpoint.clone(), bucket_name, - region.as_ref().clone(), - Some(access_key.clone()), - Some(secret_key.clone()), - security_token.clone(), - session_token.clone(), + if use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, + region, + access_key, + secret_key, + session_token, repo.clone(), - build_reqwest_client()?, ) - .await?; + .await? + .build(); match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, @@ -1132,30 +1141,36 @@ async fn main() -> color_eyre::Result<()> { Operation::MigrateStore { from, to } => { match from { config::Store::Filesystem(config::Filesystem { path }) => { - let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(&repo, from, &to).await?; + let from = FileStore::build(path.clone(), repo.clone()).await?.build(); + migrate_inner(&repo, from, to).await?; } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + use_path_style, region, access_key, secret_key, - security_token, session_token, }) => { let from = ObjectStore::build( - &bucket_name, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), - security_token, + endpoint, + bucket_name, + if use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, + region, + access_key, + secret_key, session_token, repo.clone(), - build_reqwest_client()?, ) - .await?; + .await? + .build(); - migrate_inner(&repo, from, &to).await?; + migrate_inner(&repo, from, to).await?; } } @@ -1169,31 +1184,36 @@ async fn main() -> color_eyre::Result<()> { let store = FileStore::build(path, repo.clone()).await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await, } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + use_path_style, region, access_key, secret_key, - security_token, session_token, }) => { let store = ObjectStore::build( - &bucket_name, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), - security_token, + endpoint, + bucket_name, + if use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, + region, + access_key, + secret_key, session_token, repo.clone(), - build_reqwest_client()?, ) .await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await, } } } @@ -1255,10 +1275,8 @@ where S2: Store, { let stream = from.to_stream(identifier, None, None).await?; - futures_util::pin_mut!(stream); - let mut reader = tokio_util::io::StreamReader::new(stream); - let new_identifier = to.save_async_read(&mut reader).await?; + let new_identifier = to.save_stream(stream).await?; Ok(new_identifier) } diff --git a/src/store.rs b/src/store.rs index 1df8834..def70ce 100644 --- a/src/store.rs +++ b/src/store.rs @@ -15,14 +15,24 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { Self: Sized; } -#[async_trait::async_trait(?Send)] -pub(crate) trait Store: Send + Sync + Clone + Debug { - type Identifier: Identifier + 'static; - type Stream: Stream> + 'static; +pub(crate) trait StoreConfig: Send + Sync + Clone { + type Store: Store; - async fn save_async_read(&self, reader: &mut Reader) -> Result + fn build(self) -> Self::Store; +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait Store: Clone + Debug { + type Identifier: Identifier + 'static; + type Stream: Stream> + Unpin + 'static; + + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin; + Reader: AsyncRead + Unpin + 'static; + + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static; async fn save_bytes(&self, bytes: Bytes) -> Result; @@ -39,7 +49,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin; + Writer: AsyncWrite + Unpin; async fn len(&self, identifier: &Self::Identifier) -> Result; @@ -54,13 +64,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -80,7 +97,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } @@ -102,13 +119,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -128,7 +152,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index a93dee8..fde1b81 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -2,7 +2,7 @@ use crate::{ error::Error, file::File, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; use actix_web::web::Bytes; use futures_util::stream::Stream; @@ -12,6 +12,7 @@ use std::{ }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::io::StreamReader; use tracing::{debug, error, instrument, Instrument}; mod file_id; @@ -47,19 +48,27 @@ pub(crate) struct FileStore { repo: Repo, } +impl StoreConfig for FileStore { + type Store = FileStore; + + fn build(self) -> Self::Store { + self + } +} + #[async_trait::async_trait(?Send)] impl Store for FileStore { type Identifier = FileId; type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, mut reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { let path = self.next_file().await?; - if let Err(e) = self.safe_save_reader(&path, reader).await { + if let Err(e) = self.safe_save_reader(&path, &mut reader).await { self.safe_remove_file(&path).await?; return Err(e.into()); } @@ -67,6 +76,13 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + self.save_async_read(StreamReader::new(stream)).await + } + #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; @@ -109,7 +125,7 @@ impl Store for FileStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { let path = self.path_from_file_id(identifier); @@ -255,30 +271,6 @@ impl FileStore { Ok(()) } - - // try moving a file - #[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))] - pub(crate) async fn safe_move_file, Q: AsRef>( - &self, - from: P, - to: Q, - ) -> Result<(), FileError> { - safe_create_parent(&to).await?; - - debug!("Checking if {:?} already exists", to.as_ref()); - if let Err(e) = tokio::fs::metadata(&to).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Err(FileError::FileExists); - } - - debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref()); - tokio::fs::copy(&from, &to).await?; - self.safe_remove_file(from).await?; - Ok(()) - } } pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 30af737..f9b01b2 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,22 +1,32 @@ use crate::{ error::Error, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; -use actix_web::web::Bytes; -use futures_util::{Stream, TryStreamExt}; -use s3::{ - client::Client, command::Command, creds::Credentials, error::S3Error, request_trait::Request, - Bucket, Region, +use actix_rt::task::JoinError; +use actix_web::{ + error::{BlockingError, PayloadError}, + http::{ + header::{ByteRangeSpec, Range, CONTENT_LENGTH}, + StatusCode, + }, + web::{Bytes, BytesMut}, }; -use std::{pin::Pin, string::FromUtf8Error}; +use awc::{error::SendRequestError, Client, ClientRequest, SendClientRequest}; +use futures_util::{Stream, StreamExt, TryStreamExt}; +use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; +use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::io::ReaderStream; use tracing::Instrument; +use url::Url; mod object_id; pub(crate) use object_id::ObjectId; +const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); + // - Settings Tree // - last-path -> last generated path @@ -27,14 +37,47 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), + #[error("Failed to generate request")] + S3(#[from] BucketError), + + #[error("Error making request")] + SendRequest(String), + #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), + #[error("Failed to parse xml")] + Xml(#[from] quick_xml::de::DeError), + #[error("Invalid length")] Length, - #[error("Storage error")] - Anyhow(#[from] S3Error), + #[error("Invalid etag response")] + Etag, + + #[error("Task cancelled")] + Cancelled, + + #[error("Invalid status: {0}\n{1}")] + Status(StatusCode, String), +} + +impl From for ObjectError { + fn from(e: SendRequestError) -> Self { + Self::SendRequest(e.to_string()) + } +} + +impl From for ObjectError { + fn from(_: JoinError) -> Self { + Self::Cancelled + } +} + +impl From for ObjectError { + fn from(_: BlockingError) -> Self { + Self::Cancelled + } } #[derive(Clone)] @@ -42,7 +85,75 @@ pub(crate) struct ObjectStore { path_gen: Generator, repo: Repo, bucket: Bucket, - client: reqwest::Client, + credentials: Credentials, + client: Client, +} + +#[derive(Clone)] +pub(crate) struct ObjectStoreConfig { + path_gen: Generator, + repo: Repo, + bucket: Bucket, + credentials: Credentials, +} + +#[derive(serde::Deserialize, Debug)] +struct InitiateMultipartUploadResponse { + #[serde(rename = "Bucket")] + _bucket: String, + #[serde(rename = "Key")] + _key: String, + #[serde(rename = "UploadId")] + upload_id: String, +} + +impl StoreConfig for ObjectStoreConfig { + type Store = ObjectStore; + + fn build(self) -> Self::Store { + ObjectStore { + path_gen: self.path_gen, + repo: self.repo, + bucket: self.bucket, + credentials: self.credentials, + client: crate::build_client(), + } + } +} + +fn payload_to_io_error(e: PayloadError) -> std::io::Error { + match e { + PayloadError::Io(io) => io, + otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()), + } +} + +#[tracing::instrument(skip(stream))] +async fn read_chunk(stream: &mut S) -> std::io::Result +where + S: Stream> + Unpin + 'static, +{ + let mut buf = Vec::new(); + let mut total_len = 0; + + while total_len < CHUNK_SIZE { + if let Some(res) = stream.next().await { + let bytes = res?; + total_len += bytes.len(); + buf.push(bytes); + } else { + break; + } + } + + let bytes = buf + .iter() + .fold(BytesMut::with_capacity(total_len), |mut acc, item| { + acc.extend_from_slice(item); + acc + }); + + Ok(bytes.freeze()) } #[async_trait::async_trait(?Send)] @@ -51,30 +162,136 @@ impl Store for ObjectStore { type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { - let path = self.next_file().await?; + self.save_stream(ReaderStream::new(reader)).await + } - self.bucket - .put_object_stream(&self.client, reader, &path) - .await - .map_err(ObjectError::from)?; + #[tracing::instrument(skip(stream))] + async fn save_stream(&self, mut stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + let (req, object_id) = self.create_multipart_request().await?; + let mut response = req.send().await.map_err(ObjectError::from)?; - Ok(ObjectId::from_string(path)) + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let body = response.body().await?; + let body: InitiateMultipartUploadResponse = + quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; + let upload_id = &body.upload_id; + + // hack-ish: use async block as Result boundary + let res = async { + let mut complete = false; + let mut part_number = 0; + let mut futures = Vec::new(); + + while !complete { + part_number += 1; + + let bytes = read_chunk(&mut stream).await?; + complete = bytes.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.clone(); + let handle = actix_rt::spawn( + async move { + let mut response = this + .create_upload_part_request( + bytes.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .send_body(bytes) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + // early-drop response to close its tracing spans + drop(response); + + Ok(etag) as Result + } + .instrument(tracing::info_span!("Upload Part")), + ); + + futures.push(handle); + } + + // early-drop stream to allow the next Part to be polled concurrently + drop(stream); + + let mut etags = Vec::new(); + + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); + } + + let mut response = self + .send_complete_multipart_request( + &object_id, + upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + Ok(()) as Result<(), Error> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .await?; + return Err(e); + } + + Ok(object_id) } #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { - let path = self.next_file().await?; + let (req, object_id) = self.put_object_request().await?; - self.bucket - .put_object(&self.client, &path, &bytes) - .await - .map_err(ObjectError::from)?; + let mut response = req.send_body(bytes).await.map_err(ObjectError::from)?; - Ok(ObjectId::from_string(path)) + if response.status().is_success() { + return Ok(object_id); + } + + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument] @@ -84,38 +301,19 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result { - let path = identifier.as_str(); - - let start = from_start.unwrap_or(0); - let end = len.map(|len| start + len - 1); - - let request_span = tracing::trace_span!(parent: None, "Get Object"); - - // NOTE: isolating reqwest in it's own span is to prevent the request's span from getting - // smuggled into a long-lived task. Unfortunately, I am unable to create a minimal - // reproduction of this problem so I can't open a bug about it. - let request = request_span.in_scope(|| { - Client::request( - &self.client, - &self.bucket, - path, - Command::GetObjectRange { start, end }, - ) - }); - - let response = request_span - .in_scope(|| request.response()) - .instrument(request_span.clone()) + let mut response = self + .get_object_request(identifier, from_start, len) + .send() .await .map_err(ObjectError::from)?; - let stream = request_span.in_scope(|| { - response - .bytes_stream() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - }); + if response.status().is_success() { + return Ok(Box::pin(response.map_err(payload_to_io_error))); + } - Ok(Box::pin(stream)) + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument(skip(writer))] @@ -125,41 +323,69 @@ impl Store for ObjectStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { - let path = identifier.as_str(); - - self.bucket - .get_object_stream(&self.client, path, writer) + let mut response = self + .get_object_request(identifier, None, None) + .send() .await - .map_err(ObjectError::from) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?; + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; + + if !response.status().is_success() { + let body = response.body().await.map_err(payload_to_io_error)?; + let body = String::from_utf8_lossy(&body).to_string(); + + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + ObjectError::Status(response.status(), body), + )); + } + + while let Some(res) = response.next().await { + let mut bytes = res.map_err(payload_to_io_error)?; + writer.write_all_buf(&mut bytes).await?; + } + writer.flush().await?; Ok(()) } #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let path = identifier.as_str(); - - let (head, _) = self - .bucket - .head_object(&self.client, path) + let mut response = self + .head_object_request(identifier) + .send() .await .map_err(ObjectError::from)?; - let length = head.content_length.ok_or(ObjectError::Length)?; - Ok(length as u64) + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let length = response + .headers() + .get(CONTENT_LENGTH) + .ok_or(ObjectError::Length)? + .to_str() + .map_err(|_| ObjectError::Length)? + .parse::() + .map_err(|_| ObjectError::Length)?; + + Ok(length) } #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let path = identifier.as_str(); + let mut response = self.delete_object_request(identifier).send().await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } - self.bucket - .delete_object(&self.client, path) - .await - .map_err(ObjectError::from)?; Ok(()) } } @@ -167,42 +393,193 @@ impl Store for ObjectStore { impl ObjectStore { #[allow(clippy::too_many_arguments)] pub(crate) async fn build( - bucket_name: &str, - region: Region, - access_key: Option, - secret_key: Option, - security_token: Option, + endpoint: Url, + bucket_name: String, + url_style: UrlStyle, + region: String, + access_key: String, + secret_key: String, session_token: Option, repo: Repo, - client: reqwest::Client, - ) -> Result { + ) -> Result { let path_gen = init_generator(&repo).await?; - Ok(ObjectStore { + Ok(ObjectStoreConfig { path_gen, repo, - bucket: Bucket::new( - bucket_name, - match region { - Region::Custom { endpoint, .. } => Region::Custom { - region: String::from(""), - endpoint, - }, - region => region, - }, - Credentials { - access_key, - secret_key, - security_token, - session_token, - }, - ) - .map_err(ObjectError::from)? - .with_path_style(), - client, + bucket: Bucket::new(endpoint, url_style, bucket_name, region) + .map_err(ObjectError::from)?, + credentials: if let Some(token) = session_token { + Credentials::new_with_token(access_key, secret_key, token) + } else { + Credentials::new(access_key, secret_key) + }, }) } + async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + let path = self.next_file().await?; + + let mut action = self.bucket.put_object(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + Ok((self.build_request(action), ObjectId::from_string(path))) + } + + async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + let path = self.next_file().await?; + + let mut action = self + .bucket + .create_multipart_upload(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + Ok((self.build_request(action), ObjectId::from_string(path))) + } + + async fn create_upload_part_request( + &self, + bytes: Bytes, + object_id: &ObjectId, + part_number: u16, + upload_id: &str, + ) -> Result { + use md5::Digest; + + let mut action = self.bucket.upload_part( + Some(&self.credentials), + object_id.as_str(), + part_number, + upload_id, + ); + + let hashing_span = tracing::info_span!("Hashing request body"); + let hash_string = actix_web::web::block(move || { + let guard = hashing_span.enter(); + let mut hasher = md5::Md5::new(); + hasher.update(&bytes); + let hash = hasher.finalize(); + let hash_string = base64::encode(&hash); + drop(guard); + hash_string + }) + .await + .map_err(ObjectError::from)?; + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + action.headers_mut().insert("content-md5", hash_string); + + Ok(self.build_request(action)) + } + + fn send_complete_multipart_request<'a, I: Iterator>( + &'a self, + object_id: &'a ObjectId, + upload_id: &'a str, + etags: I, + ) -> SendClientRequest { + let mut action = self.bucket.complete_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + etags, + ); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + let (req, action) = self.build_request_inner(action); + + req.send_body(action.body()) + } + + fn create_abort_multipart_request( + &self, + object_id: &ObjectId, + upload_id: &str, + ) -> ClientRequest { + let action = self.bucket.abort_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + ); + + self.build_request(action) + } + + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + let (req, _) = self.build_request_inner(action); + req + } + + fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) { + let method = match A::METHOD { + rusty_s3::Method::Head => awc::http::Method::HEAD, + rusty_s3::Method::Get => awc::http::Method::GET, + rusty_s3::Method::Post => awc::http::Method::POST, + rusty_s3::Method::Put => awc::http::Method::PUT, + rusty_s3::Method::Delete => awc::http::Method::DELETE, + }; + + let url = action.sign(Duration::from_secs(5)); + + let req = self.client.request(method, url.as_str()); + + let req = action + .headers_mut() + .iter() + .fold(req, |req, tup| req.insert_header(tup)); + + (req, action) + } + + fn get_object_request( + &self, + identifier: &ObjectId, + from_start: Option, + len: Option, + ) -> ClientRequest { + let action = self + .bucket + .get_object(Some(&self.credentials), identifier.as_str()); + + let req = self.build_request(action); + + let start = from_start.unwrap_or(0); + let end = len.map(|len| start + len - 1); + + req.insert_header(Range::Bytes(vec![if let Some(end) = end { + ByteRangeSpec::FromTo(start, end) + } else { + ByteRangeSpec::From(start) + }])) + } + + fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .head_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + + fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .delete_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); @@ -243,8 +620,8 @@ impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectStore") .field("path_gen", &"generator") - .field("bucket", &self.bucket.name) - .field("region", &self.bucket.region) + .field("bucket", &self.bucket.name()) + .field("region", &self.bucket.region()) .finish() } }