Merge pull request 'asonix/switch-s3-impl' (#5) from asonix/switch-s3-impl into main

Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/5
This commit is contained in:
asonix 2022-09-25 14:19:32 +00:00
commit 659a25d130
15 changed files with 733 additions and 472 deletions

255
Cargo.lock generated
View File

@ -22,7 +22,8 @@ dependencies = [
[[package]]
name = "actix-form-data"
version = "0.7.0-beta.0"
source = "git+https://git.asonix.dog/asonix/actix-form-data?branch=v0.7.x#3525bcd09cd030df3f2ed7684f2aad1bcc42d68b"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e721f3919cb43c566c0dbb6a9cb5ad5106ac42b6b3c0d21a7a3e762455de957a"
dependencies = [
"actix-multipart",
"actix-rt",
@ -351,29 +352,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "aws-creds"
version = "0.29.1"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976"
dependencies = [
"dirs",
"rust-ini",
"serde",
"serde-xml-rs",
"serde_derive",
"thiserror",
"url",
]
[[package]]
name = "aws-region"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bdd1c0f4aa70f72812a2f3ec325d6d6162fb80cff093f847b4c394fd78c3643"
dependencies = [
"thiserror",
]
[[package]]
name = "axum"
version = "0.5.16"
@ -720,26 +698,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "dlv-list"
version = "0.3.0"
@ -1007,12 +965,6 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.12.1"
@ -1092,19 +1044,6 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
dependencies = [
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
@ -1162,12 +1101,6 @@ dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
[[package]]
name = "itertools"
version = "0.10.5"
@ -1280,22 +1213,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "maybe-async"
version = "0.2.6"
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"proc-macro2",
"quote",
"syn",
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.5.0"
@ -1643,14 +1568,15 @@ dependencies = [
"console-subscriber",
"dashmap",
"futures-util",
"md-5",
"mime",
"num_cpus",
"once_cell",
"opentelemetry",
"opentelemetry-otlp",
"pin-project-lite",
"reqwest",
"rust-s3",
"quick-xml",
"rusty-s3",
"serde",
"serde_cbor",
"serde_json",
@ -1749,9 +1675,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.43"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58"
dependencies = [
"unicode-ident",
]
@ -1809,6 +1735,16 @@ dependencies = [
"prost",
]
[[package]]
name = "quick-xml"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.21"
@ -1857,17 +1793,6 @@ dependencies = [
"bitflags",
]
[[package]]
name = "redox_users"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
dependencies = [
"getrandom",
"redox_syscall",
"thiserror",
]
[[package]]
name = "regex"
version = "1.6.0"
@ -1903,46 +1828,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "reqwest"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -1988,35 +1873,6 @@ dependencies = [
"ordered-multimap",
]
[[package]]
name = "rust-s3"
version = "0.31.0"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976"
dependencies = [
"async-trait",
"aws-creds",
"aws-region",
"base64",
"cfg-if",
"hex",
"hmac",
"http",
"log",
"maybe-async",
"md5",
"percent-encoding",
"reqwest",
"serde",
"serde-xml-rs",
"serde_derive",
"sha2",
"thiserror",
"time",
"tokio",
"tokio-stream",
"url",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
@ -2045,12 +1901,22 @@ dependencies = [
]
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
name = "rusty-s3"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
checksum = "13332fd1e9538328a80183b9c0bde0cd7065ad2c4405f56b855a51a0a37fffd4"
dependencies = [
"base64",
"hmac",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"time",
"url",
"zeroize",
]
[[package]]
@ -2089,25 +1955,13 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "serde"
version = "1.0.144"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860"
checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde-xml-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa"
dependencies = [
"log",
"serde",
"thiserror",
"xml-rs",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
@ -2120,9 +1974,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.144"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00"
checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
dependencies = [
"proc-macro2",
"quote",
@ -2843,18 +2697,6 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
@ -2998,21 +2840,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "xml-rs"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "yaml-rust"
version = "0.4.5"
@ -3021,3 +2848,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "zeroize"
version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"

View File

@ -19,7 +19,7 @@ io-uring = [
]
[dependencies]
actix-form-data = { version = "0.7.0-beta.0", git = "https://git.asonix.dog/asonix/actix-form-data", branch = "v0.7.x" }
actix-form-data = "0.7.0-beta.0"
actix-rt = { version = "2.7.0", default-features = false }
actix-server = "2.0.0"
actix-web = { version = "4.0.0", default-features = false }
@ -33,23 +33,18 @@ config = "0.13.0"
console-subscriber = "0.1"
dashmap = "5.1.0"
futures-util = "0.3.17"
md-5 = "0.10.5"
mime = "0.3.1"
num_cpus = "1.13"
once_cell = "1.4.0"
opentelemetry = { version = "0.18", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.7"
reqwest = { version = "0.11.5", default-features = false, features = [
"rustls-tls",
"stream",
] }
rust-s3 = { version = "0.31.0", default-features = false, features = [
"fail-on-err",
"with-reqwest",
], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" }
rusty-s3 = "0.3.2"
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2"
serde_json = "1.0"
quick-xml = { version = "0.24.1", features = ["serialize"] }
sha2 = "0.10.0"
sled = { version = "0.34.7" }
storage-path-generator = "0.1.0"
@ -57,7 +52,10 @@ thiserror = "1.0"
time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio-uring = { version = "0.3", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] }
tokio-util = { version = "0.7", default-features = false, features = [
"codec",
"io",
] }
toml = "0.5.8"
tracing = "0.1.15"
tracing-error = "0.2.0"

View File

@ -14,8 +14,6 @@ services:
environment:
- PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669
- PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137
links:
- "minio:pict-rs.minio"
stdin_open: true
tty: true
volumes:
@ -23,7 +21,7 @@ services:
- ../../:/opt/app
pictrs_proxy:
image: asonix/pictrs-proxy:0.3
image: asonix/pictrs-proxy:0.4.0-alpha.4
ports:
- "8081:8081"
environment:

View File

@ -11,7 +11,8 @@ exporters:
logging:
jaeger:
endpoint: jaeger:14250
insecure: true
tls:
insecure: true
service:
pipelines:

View File

@ -1,6 +1,7 @@
[server]
address = '0.0.0.0:8080'
worker_id = 'pict-rs-1'
[tracing.logging]
format = 'normal'
targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'
@ -21,13 +22,7 @@ max_height = 10000
max_area = 40000000
max_file_size = 40
enable_silent_video = true
filters = [
'blur',
'crop',
'identity',
'resize',
'thumbnail',
]
filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail']
skip_validate_imports = false
[repo]
@ -37,7 +32,9 @@ cache_capacity = 67108864
[store]
type = 'object_storage'
endpoint = 'http://minio:9000'
use_path_style = true
bucket_name = 'pict-rs'
region = 'http://minio:9000'
access_key = 'Q7Z3AY3JO01N27UNH5IR'
secret_key = 'bH3Kj6UVJF+btBtWsExVurN3ukEilC3saECsupzi'
region = 'minio'
access_key = 'pictrs'
secret_key = 'pictrspass'

View File

@ -5,7 +5,6 @@ use crate::{
};
use actix_web::web::Bytes;
use futures_util::{Stream, TryStreamExt};
use tokio_util::io::StreamReader;
use tracing::{Instrument, Span};
pub(crate) struct Backgrounded<R, S>
@ -38,7 +37,7 @@ where
pub(crate) async fn proxy<P>(repo: R, store: S, stream: P) -> Result<Self, Error>
where
P: Stream<Item = Result<Bytes, Error>>,
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{
let mut this = Self {
repo,
@ -53,14 +52,13 @@ where
async fn do_proxy<P>(&mut self, store: S, stream: P) -> Result<(), Error>
where
P: Stream<Item = Result<Bytes, Error>>,
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{
UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?;
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut reader = StreamReader::new(Box::pin(stream));
let identifier = store.save_async_read(&mut reader).await?;
let identifier = store.save_stream(stream).await?;
self.identifier = Some(identifier);

View File

@ -543,13 +543,30 @@ struct Filesystem {
#[derive(Clone, Debug, Parser, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct ObjectStorage {
/// The base endpoint for the object storage
///
/// Examples:
/// - `http://localhost:9000`
/// - `https://s3.dualstack.eu-west-1.amazonaws.com`
#[clap(short, long)]
endpoint: Url,
/// Determines whether to use path style or virtualhost style for accessing objects
///
/// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object}
/// When false, objects will be fetched from {bucket_name}.{endpoint}/{object}
#[clap(short, long)]
use_path_style: bool,
/// The bucket in which to store media
#[clap(short, long)]
bucket_name: Option<String>,
/// The region the bucket is located in
///
/// For minio deployments, this can just be 'minio'
#[clap(short, long)]
region: Option<Serde<s3::Region>>,
region: Option<String>,
/// The Access Key for the user accessing the bucket
#[clap(short, long)]
@ -559,10 +576,6 @@ struct ObjectStorage {
#[clap(short, long)]
secret_key: Option<String>,
/// The security token for accessing the bucket
#[clap(long)]
security_token: Option<String>,
/// The session token for accessing the bucket
#[clap(long)]
session_token: Option<String>,

View File

@ -1,8 +1,8 @@
use crate::magick::ValidInputType;
use crate::serde_str::Serde;
use clap::ArgEnum;
use std::{fmt::Display, path::PathBuf, str::FromStr};
use tracing::Level;
use url::Url;
#[derive(
Clone,
@ -63,13 +63,28 @@ pub(crate) struct Filesystem {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, clap::Parser)]
#[serde(rename_all = "snake_case")]
pub(crate) struct ObjectStorage {
/// The base endpoint for the object storage
///
/// Examples:
/// - `http://localhost:9000`
/// - `https://s3.dualstack.eu-west-1.amazonaws.com`
#[clap(short, long)]
pub(crate) endpoint: Url,
/// Determines whether to use path style or virtualhost style for accessing objects
///
/// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object}
/// When false, objects will be fetched from {bucket_name}.{endpoint}/{object}
#[clap(short, long)]
pub(crate) use_path_style: bool,
/// The bucket in which to store media
#[clap(short, long)]
pub(crate) bucket_name: String,
/// The region the bucket is located in
#[clap(short, long)]
pub(crate) region: Serde<s3::Region>,
pub(crate) region: String,
/// The Access Key for the user accessing the bucket
#[clap(short, long)]
@ -79,11 +94,6 @@ pub(crate) struct ObjectStorage {
#[clap(short, long)]
pub(crate) secret_key: String,
/// The security token for accessing the bucket
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) security_token: Option<String>,
/// The session token for accessing the bucket
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
@ -224,7 +234,8 @@ impl Display for LogFormat {
#[cfg(test)]
mod tests {
use super::{Serde, Targets};
use super::Targets;
use crate::serde_str::Serde;
#[test]
fn builds_info_targets() {

View File

@ -57,14 +57,14 @@ async fn process<R: FullRepo, S: Store + 'static>(
identifier
} else {
let identifier = repo.identifier(hash.clone()).await?;
let mut reader = crate::ffmpeg::thumbnail(
let reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,
InputFormat::Mp4,
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = store.save_async_read(&mut reader).await?;
let motion_identifier = store.save_async_read(reader).await?;
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
.await?;

View File

@ -55,7 +55,7 @@ where
pub(crate) async fn ingest<R, S>(
repo: &R,
store: &S,
stream: impl Stream<Item = Result<Bytes, Error>>,
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>,
should_validate: bool,
is_cached: bool,
@ -77,9 +77,10 @@ where
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, Sha256::new());
let hasher_reader = Hasher::new(validated_reader, Sha256::new());
let hasher = hasher_reader.hasher();
let identifier = store.save_async_read(&mut hasher_reader).await?;
let identifier = store.save_async_read(hasher_reader).await?;
drop(permit);
@ -90,7 +91,7 @@ where
identifier: Some(identifier.clone()),
};
let hash = hasher_reader.finalize_reset().await?;
let hash = hasher.borrow_mut().finalize_reset().to_vec();
session.hash = Some(hash.clone());

View File

@ -1,8 +1,8 @@
use crate::error::Error;
use actix_web::web;
use sha2::{digest::FixedOutputReset, Digest};
use std::{
cell::RefCell,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};
@ -12,7 +12,7 @@ pin_project_lite::pin_project! {
#[pin]
inner: I,
hasher: D,
hasher: Rc<RefCell<D>>,
}
}
@ -23,14 +23,12 @@ where
pub(super) fn new(reader: I, digest: D) -> Self {
Hasher {
inner: reader,
hasher: digest,
hasher: Rc::new(RefCell::new(digest)),
}
}
pub(super) async fn finalize_reset(self) -> Result<Vec<u8>, Error> {
let mut hasher = self.hasher;
let hash = web::block(move || hasher.finalize_reset().to_vec()).await?;
Ok(hash)
pub(super) fn hasher(&self) -> Rc<RefCell<D>> {
Rc::clone(&self.hasher)
}
}
@ -53,7 +51,9 @@ where
let poll_res = reader.poll_read(cx, buf);
let after_len = buf.filled().len();
if after_len > before_len {
hasher.update(&buf.filled()[before_len..after_len]);
hasher
.borrow_mut()
.update(&buf.filled()[before_len..after_len]);
}
poll_res
}
@ -89,11 +89,11 @@ mod test {
let hash = test_on_arbiter!(async move {
let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
let mut hasher = Hasher::new(file1, Sha256::new());
let mut reader = Hasher::new(file1, Sha256::new());
tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?;
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
hasher.finalize_reset().await
Ok(reader.hasher().borrow_mut().finalize_reset().to_vec()) as std::io::Result<_>
})
.unwrap();

View File

@ -10,6 +10,7 @@ use futures_util::{
Stream, StreamExt, TryStreamExt,
};
use once_cell::sync::Lazy;
use rusty_s3::UrlStyle;
use std::{
future::ready,
path::PathBuf,
@ -65,7 +66,11 @@ use self::{
UploadResult,
},
serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
store::{
file_store::FileStore,
object_store::{ObjectStore, ObjectStoreConfig},
Identifier, Store, StoreConfig,
},
stream::{StreamLimit, StreamTimeout},
};
@ -448,7 +453,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
#[instrument(name = "Downloading file inline", skip(stream))]
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
store: web::Data<S>,
is_cached: bool,
@ -474,7 +479,7 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
#[instrument(name = "Downloading file in background", skip(stream))]
async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
store: web::Data<S>,
is_cached: bool,
@ -958,16 +963,10 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
fn build_client() -> awc::Client {
Client::builder()
.wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.3.0-main"))
.add_default_header(("User-Agent", "pict-rs v0.4.0-main"))
.finish()
}
fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
reqwest::Client::builder()
.user_agent("pict-rs v0.3.0-main")
.build()
}
fn next_worker_id() -> String {
static WORKER_ID: AtomicU64 = AtomicU64::new(0);
@ -976,15 +975,15 @@ fn next_worker_id() -> String {
format!("{}-{}", CONFIG.server.worker_id, next_id)
}
async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
async fn launch<R: FullRepo + 'static, SC: StoreConfig + 'static>(
repo: R,
store: S,
store_config: SC,
) -> color_eyre::Result<()> {
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
HttpServer::new(move || {
let store = store.clone();
let store = store_config.clone().build();
let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
@ -1013,20 +1012,23 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload::<R, S>)),
.route(web::post().to(upload::<R, SC::Store>)),
)
.service(
web::scope("/backgrounded")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload_backgrounded::<R, S>)),
.route(web::post().to(upload_backgrounded::<R, SC::Store>)),
)
.service(
web::resource("/claim").route(web::get().to(claim_upload::<R, S>)),
web::resource("/claim")
.route(web::get().to(claim_upload::<R, SC::Store>)),
),
)
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(
web::resource("/download").route(web::get().to(download::<R, SC::Store>)),
)
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>))
@ -1034,27 +1036,27 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
)
.service(
web::resource("/original/{filename}")
.route(web::get().to(serve::<R, S>))
.route(web::head().to(serve_head::<R, S>)),
.route(web::get().to(serve::<R, SC::Store>))
.route(web::head().to(serve_head::<R, SC::Store>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process::<R, S>))
.route(web::head().to(process_head::<R, S>)),
.route(web::get().to(process::<R, SC::Store>))
.route(web::head().to(process_head::<R, SC::Store>)),
)
.service(
web::resource("/process_backgrounded.{ext}")
.route(web::get().to(process_backgrounded::<R, S>)),
.route(web::get().to(process_backgrounded::<R, SC::Store>)),
)
.service(
web::scope("/details")
.service(
web::resource("/original/{filename}")
.route(web::get().to(details::<R, S>)),
.route(web::get().to(details::<R, SC::Store>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details::<R, S>)),
.route(web::get().to(process_details::<R, SC::Store>)),
),
),
)
@ -1063,7 +1065,7 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
.wrap(Internal(
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()),
))
.service(web::resource("/import").route(web::post().to(import::<R, S>)))
.service(web::resource("/import").route(web::post().to(import::<R, SC::Store>)))
.service(
web::resource("/variants").route(web::delete().to(clean_variants::<R>)),
)
@ -1080,36 +1082,43 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
Ok(())
}
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()>
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()>
where
S1: Store,
{
match to {
config::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?;
let to = FileStore::build(path.clone(), repo.clone()).await?.build();
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
use_path_style,
region,
access_key,
secret_key,
security_token,
session_token,
}) => {
let to = ObjectStore::build(
endpoint.clone(),
bucket_name,
region.as_ref().clone(),
Some(access_key.clone()),
Some(secret_key.clone()),
security_token.clone(),
session_token.clone(),
if use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region,
access_key,
secret_key,
session_token,
repo.clone(),
build_reqwest_client()?,
)
.await?;
.await?
.build();
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
@ -1132,30 +1141,36 @@ async fn main() -> color_eyre::Result<()> {
Operation::MigrateStore { from, to } => {
match from {
config::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&repo, from, &to).await?;
let from = FileStore::build(path.clone(), repo.clone()).await?.build();
migrate_inner(&repo, from, to).await?;
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
use_path_style,
region,
access_key,
secret_key,
security_token,
session_token,
}) => {
let from = ObjectStore::build(
&bucket_name,
Serde::into_inner(region),
Some(access_key),
Some(secret_key),
security_token,
endpoint,
bucket_name,
if use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region,
access_key,
secret_key,
session_token,
repo.clone(),
build_reqwest_client()?,
)
.await?;
.await?
.build();
migrate_inner(&repo, from, &to).await?;
migrate_inner(&repo, from, to).await?;
}
}
@ -1169,31 +1184,36 @@ async fn main() -> color_eyre::Result<()> {
let store = FileStore::build(path, repo.clone()).await?;
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
use_path_style,
region,
access_key,
secret_key,
security_token,
session_token,
}) => {
let store = ObjectStore::build(
&bucket_name,
Serde::into_inner(region),
Some(access_key),
Some(secret_key),
security_token,
endpoint,
bucket_name,
if use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region,
access_key,
secret_key,
session_token,
repo.clone(),
build_reqwest_client()?,
)
.await?;
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await,
}
}
}
@ -1255,10 +1275,8 @@ where
S2: Store,
{
let stream = from.to_stream(identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let new_identifier = to.save_async_read(&mut reader).await?;
let new_identifier = to.save_stream(stream).await?;
Ok(new_identifier)
}

View File

@ -15,14 +15,24 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
Self: Sized;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug {
type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + 'static;
pub(crate) trait StoreConfig: Send + Sync + Clone {
type Store: Store;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
fn build(self) -> Self::Store;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Clone + Debug {
type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin;
Reader: AsyncRead + Unpin + 'static;
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error>;
@ -39,7 +49,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug {
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin;
Writer: AsyncWrite + Unpin;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error>;
@ -54,13 +64,20 @@ where
type Identifier = T::Identifier;
type Stream = T::Stream;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
Reader: AsyncRead + Unpin + 'static,
{
T::save_async_read(self, reader).await
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
T::save_stream(self, stream).await
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
T::save_bytes(self, bytes).await
}
@ -80,7 +97,7 @@ where
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin,
Writer: AsyncWrite + Unpin,
{
T::read_into(self, identifier, writer).await
}
@ -102,13 +119,20 @@ where
type Identifier = T::Identifier;
type Stream = T::Stream;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
Reader: AsyncRead + Unpin + 'static,
{
T::save_async_read(self, reader).await
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
T::save_stream(self, stream).await
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
T::save_bytes(self, bytes).await
}
@ -128,7 +152,7 @@ where
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin,
Writer: AsyncWrite + Unpin,
{
T::read_into(self, identifier, writer).await
}

View File

@ -2,7 +2,7 @@ use crate::{
error::Error,
file::File,
repo::{Repo, SettingsRepo},
store::Store,
store::{Store, StoreConfig},
};
use actix_web::web::Bytes;
use futures_util::stream::Stream;
@ -12,6 +12,7 @@ use std::{
};
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument, Instrument};
mod file_id;
@ -47,19 +48,27 @@ pub(crate) struct FileStore {
repo: Repo,
}
impl StoreConfig for FileStore {
type Store = FileStore;
fn build(self) -> Self::Store {
self
}
}
#[async_trait::async_trait(?Send)]
impl Store for FileStore {
type Identifier = FileId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, mut reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
Reader: AsyncRead + Unpin + 'static,
{
let path = self.next_file().await?;
if let Err(e) = self.safe_save_reader(&path, reader).await {
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
self.safe_remove_file(&path).await?;
return Err(e.into());
}
@ -67,6 +76,13 @@ impl Store for FileStore {
Ok(self.file_id_from_path(path)?)
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
self.save_async_read(StreamReader::new(stream)).await
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let path = self.next_file().await?;
@ -109,7 +125,7 @@ impl Store for FileStore {
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin,
Writer: AsyncWrite + Unpin,
{
let path = self.path_from_file_id(identifier);
@ -255,30 +271,6 @@ impl FileStore {
Ok(())
}
// try moving a file
#[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))]
pub(crate) async fn safe_move_file<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
from: P,
to: Q,
) -> Result<(), FileError> {
safe_create_parent(&to).await?;
debug!("Checking if {:?} already exists", to.as_ref());
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(FileError::FileExists);
}
debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref());
tokio::fs::copy(&from, &to).await?;
self.safe_remove_file(from).await?;
Ok(())
}
}
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {

View File

@ -1,22 +1,32 @@
use crate::{
error::Error,
repo::{Repo, SettingsRepo},
store::Store,
store::{Store, StoreConfig},
};
use actix_web::web::Bytes;
use futures_util::{Stream, TryStreamExt};
use s3::{
client::Client, command::Command, creds::Credentials, error::S3Error, request_trait::Request,
Bucket, Region,
use actix_rt::task::JoinError;
use actix_web::{
error::{BlockingError, PayloadError},
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
web::{Bytes, BytesMut},
};
use std::{pin::Pin, string::FromUtf8Error};
use awc::{error::SendRequestError, Client, ClientRequest, SendClientRequest};
use futures_util::{Stream, StreamExt, TryStreamExt};
use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle};
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
use storage_path_generator::{Generator, Path};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use tracing::Instrument;
use url::Url;
mod object_id;
pub(crate) use object_id::ObjectId;
const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
// - Settings Tree
// - last-path -> last generated path
@ -27,14 +37,47 @@ pub(crate) enum ObjectError {
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Failed to generate request")]
S3(#[from] BucketError),
#[error("Error making request")]
SendRequest(String),
#[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error),
#[error("Failed to parse xml")]
Xml(#[from] quick_xml::de::DeError),
#[error("Invalid length")]
Length,
#[error("Storage error")]
Anyhow(#[from] S3Error),
#[error("Invalid etag response")]
Etag,
#[error("Task cancelled")]
Cancelled,
#[error("Invalid status: {0}\n{1}")]
Status(StatusCode, String),
}
impl From<SendRequestError> for ObjectError {
fn from(e: SendRequestError) -> Self {
Self::SendRequest(e.to_string())
}
}
impl From<JoinError> for ObjectError {
fn from(_: JoinError) -> Self {
Self::Cancelled
}
}
impl From<BlockingError> for ObjectError {
fn from(_: BlockingError) -> Self {
Self::Cancelled
}
}
#[derive(Clone)]
@ -42,7 +85,75 @@ pub(crate) struct ObjectStore {
path_gen: Generator,
repo: Repo,
bucket: Bucket,
client: reqwest::Client,
credentials: Credentials,
client: Client,
}
#[derive(Clone)]
pub(crate) struct ObjectStoreConfig {
path_gen: Generator,
repo: Repo,
bucket: Bucket,
credentials: Credentials,
}
#[derive(serde::Deserialize, Debug)]
struct InitiateMultipartUploadResponse {
#[serde(rename = "Bucket")]
_bucket: String,
#[serde(rename = "Key")]
_key: String,
#[serde(rename = "UploadId")]
upload_id: String,
}
impl StoreConfig for ObjectStoreConfig {
type Store = ObjectStore;
fn build(self) -> Self::Store {
ObjectStore {
path_gen: self.path_gen,
repo: self.repo,
bucket: self.bucket,
credentials: self.credentials,
client: crate::build_client(),
}
}
}
fn payload_to_io_error(e: PayloadError) -> std::io::Error {
match e {
PayloadError::Io(io) => io,
otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()),
}
}
#[tracing::instrument(skip(stream))]
async fn read_chunk<S>(stream: &mut S) -> std::io::Result<Bytes>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
let mut buf = Vec::new();
let mut total_len = 0;
while total_len < CHUNK_SIZE {
if let Some(res) = stream.next().await {
let bytes = res?;
total_len += bytes.len();
buf.push(bytes);
} else {
break;
}
}
let bytes = buf
.iter()
.fold(BytesMut::with_capacity(total_len), |mut acc, item| {
acc.extend_from_slice(item);
acc
});
Ok(bytes.freeze())
}
#[async_trait::async_trait(?Send)]
@ -51,30 +162,136 @@ impl Store for ObjectStore {
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
Reader: AsyncRead + Unpin + 'static,
{
let path = self.next_file().await?;
self.save_stream(ReaderStream::new(reader)).await
}
self.bucket
.put_object_stream(&self.client, reader, &path)
.await
.map_err(ObjectError::from)?;
#[tracing::instrument(skip(stream))]
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
let (req, object_id) = self.create_multipart_request().await?;
let mut response = req.send().await.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path))
if !response.status().is_success() {
let body = String::from_utf8_lossy(&response.body().await?).to_string();
return Err(ObjectError::Status(response.status(), body).into());
}
let body = response.body().await?;
let body: InitiateMultipartUploadResponse =
quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?;
let upload_id = &body.upload_id;
// hack-ish: use async block as Result boundary
let res = async {
let mut complete = false;
let mut part_number = 0;
let mut futures = Vec::new();
while !complete {
part_number += 1;
let bytes = read_chunk(&mut stream).await?;
complete = bytes.len() < CHUNK_SIZE;
let this = self.clone();
let object_id2 = object_id.clone();
let upload_id2 = upload_id.clone();
let handle = actix_rt::spawn(
async move {
let mut response = this
.create_upload_part_request(
bytes.clone(),
&object_id2,
part_number,
&upload_id2,
)
.await?
.send_body(bytes)
.await?;
if !response.status().is_success() {
let body = String::from_utf8_lossy(&response.body().await?).to_string();
return Err(ObjectError::Status(response.status(), body).into());
}
let etag = response
.headers()
.get("etag")
.ok_or(ObjectError::Etag)?
.to_str()
.map_err(|_| ObjectError::Etag)?
.to_string();
// early-drop response to close its tracing spans
drop(response);
Ok(etag) as Result<String, Error>
}
.instrument(tracing::info_span!("Upload Part")),
);
futures.push(handle);
}
// early-drop stream to allow the next Part to be polled concurrently
drop(stream);
let mut etags = Vec::new();
for future in futures {
etags.push(future.await.map_err(ObjectError::from)??);
}
let mut response = self
.send_complete_multipart_request(
&object_id,
upload_id,
etags.iter().map(|s| s.as_ref()),
)
.await?;
if !response.status().is_success() {
let body = String::from_utf8_lossy(&response.body().await?).to_string();
return Err(ObjectError::Status(response.status(), body).into());
}
Ok(()) as Result<(), Error>
}
.await;
if let Err(e) = res {
self.create_abort_multipart_request(&object_id, upload_id)
.send()
.await?;
return Err(e);
}
Ok(object_id)
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let path = self.next_file().await?;
let (req, object_id) = self.put_object_request().await?;
self.bucket
.put_object(&self.client, &path, &bytes)
.await
.map_err(ObjectError::from)?;
let mut response = req.send_body(bytes).await.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path))
if response.status().is_success() {
return Ok(object_id);
}
let body = String::from_utf8_lossy(&response.body().await?).to_string();
Err(ObjectError::Status(response.status(), body).into())
}
#[tracing::instrument]
@ -84,38 +301,19 @@ impl Store for ObjectStore {
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
let path = identifier.as_str();
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
let request_span = tracing::trace_span!(parent: None, "Get Object");
// NOTE: isolating reqwest in it's own span is to prevent the request's span from getting
// smuggled into a long-lived task. Unfortunately, I am unable to create a minimal
// reproduction of this problem so I can't open a bug about it.
let request = request_span.in_scope(|| {
Client::request(
&self.client,
&self.bucket,
path,
Command::GetObjectRange { start, end },
)
});
let response = request_span
.in_scope(|| request.response())
.instrument(request_span.clone())
let mut response = self
.get_object_request(identifier, from_start, len)
.send()
.await
.map_err(ObjectError::from)?;
let stream = request_span.in_scope(|| {
response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
});
if response.status().is_success() {
return Ok(Box::pin(response.map_err(payload_to_io_error)));
}
Ok(Box::pin(stream))
let body = String::from_utf8_lossy(&response.body().await?).to_string();
Err(ObjectError::Status(response.status(), body).into())
}
#[tracing::instrument(skip(writer))]
@ -125,41 +323,69 @@ impl Store for ObjectStore {
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin,
Writer: AsyncWrite + Unpin,
{
let path = identifier.as_str();
self.bucket
.get_object_stream(&self.client, path, writer)
let mut response = self
.get_object_request(identifier, None, None)
.send()
.await
.map_err(ObjectError::from)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?;
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?;
if !response.status().is_success() {
let body = response.body().await.map_err(payload_to_io_error)?;
let body = String::from_utf8_lossy(&body).to_string();
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
ObjectError::Status(response.status(), body),
));
}
while let Some(res) = response.next().await {
let mut bytes = res.map_err(payload_to_io_error)?;
writer.write_all_buf(&mut bytes).await?;
}
writer.flush().await?;
Ok(())
}
#[tracing::instrument]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let path = identifier.as_str();
let (head, _) = self
.bucket
.head_object(&self.client, path)
let mut response = self
.head_object_request(identifier)
.send()
.await
.map_err(ObjectError::from)?;
let length = head.content_length.ok_or(ObjectError::Length)?;
Ok(length as u64)
if !response.status().is_success() {
let body = String::from_utf8_lossy(&response.body().await?).to_string();
return Err(ObjectError::Status(response.status(), body).into());
}
let length = response
.headers()
.get(CONTENT_LENGTH)
.ok_or(ObjectError::Length)?
.to_str()
.map_err(|_| ObjectError::Length)?
.parse::<u64>()
.map_err(|_| ObjectError::Length)?;
Ok(length)
}
#[tracing::instrument]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let path = identifier.as_str();
let mut response = self.delete_object_request(identifier).send().await?;
if !response.status().is_success() {
let body = String::from_utf8_lossy(&response.body().await?).to_string();
return Err(ObjectError::Status(response.status(), body).into());
}
self.bucket
.delete_object(&self.client, path)
.await
.map_err(ObjectError::from)?;
Ok(())
}
}
@ -167,42 +393,193 @@ impl Store for ObjectStore {
impl ObjectStore {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn build(
bucket_name: &str,
region: Region,
access_key: Option<String>,
secret_key: Option<String>,
security_token: Option<String>,
endpoint: Url,
bucket_name: String,
url_style: UrlStyle,
region: String,
access_key: String,
secret_key: String,
session_token: Option<String>,
repo: Repo,
client: reqwest::Client,
) -> Result<ObjectStore, Error> {
) -> Result<ObjectStoreConfig, Error> {
let path_gen = init_generator(&repo).await?;
Ok(ObjectStore {
Ok(ObjectStoreConfig {
path_gen,
repo,
bucket: Bucket::new(
bucket_name,
match region {
Region::Custom { endpoint, .. } => Region::Custom {
region: String::from(""),
endpoint,
},
region => region,
},
Credentials {
access_key,
secret_key,
security_token,
session_token,
},
)
.map_err(ObjectError::from)?
.with_path_style(),
client,
bucket: Bucket::new(endpoint, url_style, bucket_name, region)
.map_err(ObjectError::from)?,
credentials: if let Some(token) = session_token {
Credentials::new_with_token(access_key, secret_key, token)
} else {
Credentials::new(access_key, secret_key)
},
})
}
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> {
let path = self.next_file().await?;
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
action
.headers_mut()
.insert("content-type", "application/octet-stream");
Ok((self.build_request(action), ObjectId::from_string(path)))
}
async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> {
let path = self.next_file().await?;
let mut action = self
.bucket
.create_multipart_upload(Some(&self.credentials), &path);
action
.headers_mut()
.insert("content-type", "application/octet-stream");
Ok((self.build_request(action), ObjectId::from_string(path)))
}
async fn create_upload_part_request(
&self,
bytes: Bytes,
object_id: &ObjectId,
part_number: u16,
upload_id: &str,
) -> Result<ClientRequest, Error> {
use md5::Digest;
let mut action = self.bucket.upload_part(
Some(&self.credentials),
object_id.as_str(),
part_number,
upload_id,
);
let hashing_span = tracing::info_span!("Hashing request body");
let hash_string = actix_web::web::block(move || {
let guard = hashing_span.enter();
let mut hasher = md5::Md5::new();
hasher.update(&bytes);
let hash = hasher.finalize();
let hash_string = base64::encode(&hash);
drop(guard);
hash_string
})
.await
.map_err(ObjectError::from)?;
action
.headers_mut()
.insert("content-type", "application/octet-stream");
action.headers_mut().insert("content-md5", hash_string);
Ok(self.build_request(action))
}
fn send_complete_multipart_request<'a, I: Iterator<Item = &'a str>>(
&'a self,
object_id: &'a ObjectId,
upload_id: &'a str,
etags: I,
) -> SendClientRequest {
let mut action = self.bucket.complete_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
upload_id,
etags,
);
action
.headers_mut()
.insert("content-type", "application/octet-stream");
let (req, action) = self.build_request_inner(action);
req.send_body(action.body())
}
fn create_abort_multipart_request(
&self,
object_id: &ObjectId,
upload_id: &str,
) -> ClientRequest {
let action = self.bucket.abort_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
upload_id,
);
self.build_request(action)
}
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest {
let (req, _) = self.build_request_inner(action);
req
}
fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) {
let method = match A::METHOD {
rusty_s3::Method::Head => awc::http::Method::HEAD,
rusty_s3::Method::Get => awc::http::Method::GET,
rusty_s3::Method::Post => awc::http::Method::POST,
rusty_s3::Method::Put => awc::http::Method::PUT,
rusty_s3::Method::Delete => awc::http::Method::DELETE,
};
let url = action.sign(Duration::from_secs(5));
let req = self.client.request(method, url.as_str());
let req = action
.headers_mut()
.iter()
.fold(req, |req, tup| req.insert_header(tup));
(req, action)
}
fn get_object_request(
&self,
identifier: &ObjectId,
from_start: Option<u64>,
len: Option<u64>,
) -> ClientRequest {
let action = self
.bucket
.get_object(Some(&self.credentials), identifier.as_str());
let req = self.build_request(action);
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
req.insert_header(Range::Bytes(vec![if let Some(end) = end {
ByteRangeSpec::FromTo(start, end)
} else {
ByteRangeSpec::From(start)
}]))
}
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
let action = self
.bucket
.head_object(Some(&self.credentials), identifier.as_str());
self.build_request(action)
}
fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest {
let action = self
.bucket
.delete_object(Some(&self.credentials), identifier.as_str());
self.build_request(action)
}
async fn next_directory(&self) -> Result<Path, Error> {
let path = self.path_gen.next();
@ -243,8 +620,8 @@ impl std::fmt::Debug for ObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStore")
.field("path_gen", &"generator")
.field("bucket", &self.bucket.name)
.field("region", &self.bucket.region)
.field("bucket", &self.bucket.name())
.field("region", &self.bucket.region())
.finish()
}
}