2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Start work on using rusty-s3 instead of rust-s3

This commit is contained in:
asonix 2022-09-24 13:39:27 -05:00
parent cedf14375c
commit ab7fd9aaf7
6 changed files with 215 additions and 336 deletions

246
Cargo.lock generated
View file

@ -351,29 +351,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "aws-creds"
version = "0.29.1"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976"
dependencies = [
"dirs",
"rust-ini",
"serde",
"serde-xml-rs",
"serde_derive",
"thiserror",
"url",
]
[[package]]
name = "aws-region"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bdd1c0f4aa70f72812a2f3ec325d6d6162fb80cff093f847b4c394fd78c3643"
dependencies = [
"thiserror",
]
[[package]] [[package]]
name = "axum" name = "axum"
version = "0.5.16" version = "0.5.16"
@ -720,26 +697,6 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]] [[package]]
name = "dlv-list" name = "dlv-list"
version = "0.3.0" version = "0.3.0"
@ -1007,12 +964,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "hmac" name = "hmac"
version = "0.12.1" version = "0.12.1"
@ -1092,19 +1043,6 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-rustls"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
dependencies = [
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "hyper-timeout" name = "hyper-timeout"
version = "0.4.1" version = "0.4.1"
@ -1162,12 +1100,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "ipnet"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.5" version = "0.10.5"
@ -1280,22 +1212,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]] [[package]]
name = "maybe-async" name = "md-5"
version = "0.2.6" version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [ dependencies = [
"proc-macro2", "digest",
"quote",
"syn",
] ]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
@ -1649,8 +1573,7 @@ dependencies = [
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",
"pin-project-lite", "pin-project-lite",
"reqwest", "rusty-s3",
"rust-s3",
"serde", "serde",
"serde_cbor", "serde_cbor",
"serde_json", "serde_json",
@ -1809,6 +1732,16 @@ dependencies = [
"prost", "prost",
] ]
[[package]]
name = "quick-xml"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081"
dependencies = [
"memchr",
"serde",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.21" version = "1.0.21"
@ -1857,17 +1790,6 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "redox_users"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
dependencies = [
"getrandom",
"redox_syscall",
"thiserror",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.6.0" version = "1.6.0"
@ -1903,46 +1825,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "reqwest"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.16.20"
@ -1988,35 +1870,6 @@ dependencies = [
"ordered-multimap", "ordered-multimap",
] ]
[[package]]
name = "rust-s3"
version = "0.31.0"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976"
dependencies = [
"async-trait",
"aws-creds",
"aws-region",
"base64",
"cfg-if",
"hex",
"hmac",
"http",
"log",
"maybe-async",
"md5",
"percent-encoding",
"reqwest",
"serde",
"serde-xml-rs",
"serde_derive",
"sha2",
"thiserror",
"time",
"tokio",
"tokio-stream",
"url",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.21" version = "0.1.21"
@ -2045,12 +1898,22 @@ dependencies = [
] ]
[[package]] [[package]]
name = "rustls-pemfile" name = "rusty-s3"
version = "1.0.1" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" checksum = "13332fd1e9538328a80183b9c0bde0cd7065ad2c4405f56b855a51a0a37fffd4"
dependencies = [ dependencies = [
"base64", "base64",
"hmac",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"time",
"url",
"zeroize",
] ]
[[package]] [[package]]
@ -2089,25 +1952,13 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.144" version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]]
name = "serde-xml-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa"
dependencies = [
"log",
"serde",
"thiserror",
"xml-rs",
]
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
version = "0.11.2" version = "0.11.2"
@ -2120,9 +1971,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.144" version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2843,18 +2694,6 @@ dependencies = [
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.83" version = "0.2.83"
@ -2998,21 +2837,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "xml-rs"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]] [[package]]
name = "yaml-rust" name = "yaml-rust"
version = "0.4.5" version = "0.4.5"
@ -3021,3 +2845,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [ dependencies = [
"linked-hash-map", "linked-hash-map",
] ]
[[package]]
name = "zeroize"
version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"

View file

@ -39,14 +39,7 @@ once_cell = "1.4.0"
opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry = { version = "0.18", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11" opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.7" pin-project-lite = "0.2.7"
reqwest = { version = "0.11.5", default-features = false, features = [ rusty-s3 = "0.3.2"
"rustls-tls",
"stream",
] }
rust-s3 = { version = "0.31.0", default-features = false, features = [
"fail-on-err",
"with-reqwest",
], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2" serde_cbor = "0.11.2"
serde_json = "1.0" serde_json = "1.0"
@ -57,7 +50,10 @@ thiserror = "1.0"
time = { version = "0.3.0", features = ["serde", "serde-well-known"] } time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] } tokio = { version = "1", features = ["full", "tracing"] }
tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] } tokio-util = { version = "0.7", default-features = false, features = [
"codec",
"io",
] }
toml = "0.5.8" toml = "0.5.8"
tracing = "0.1.15" tracing = "0.1.15"
tracing-error = "0.2.0" tracing-error = "0.2.0"

View file

@ -559,10 +559,6 @@ struct ObjectStorage {
#[clap(short, long)] #[clap(short, long)]
secret_key: Option<String>, secret_key: Option<String>,
/// The security token for accessing the bucket
#[clap(long)]
security_token: Option<String>,
/// The session token for accessing the bucket /// The session token for accessing the bucket
#[clap(long)] #[clap(long)]
session_token: Option<String>, session_token: Option<String>,

View file

@ -79,11 +79,6 @@ pub(crate) struct ObjectStorage {
#[clap(short, long)] #[clap(short, long)]
pub(crate) secret_key: String, pub(crate) secret_key: String,
/// The security token for accessing the bucket
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) security_token: Option<String>,
/// The session token for accessing the bucket /// The session token for accessing the bucket
#[clap(long)] #[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]

View file

@ -958,16 +958,10 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
fn build_client() -> awc::Client { fn build_client() -> awc::Client {
Client::builder() Client::builder()
.wrap(Tracing) .wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.3.0-main")) .add_default_header(("User-Agent", "pict-rs v0.4.0-main"))
.finish() .finish()
} }
fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
reqwest::Client::builder()
.user_agent("pict-rs v0.3.0-main")
.build()
}
fn next_worker_id() -> String { fn next_worker_id() -> String {
static WORKER_ID: AtomicU64 = AtomicU64::new(0); static WORKER_ID: AtomicU64 = AtomicU64::new(0);
@ -1092,22 +1086,24 @@ where
} }
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name, bucket_name,
url_style,
region, region,
access_key, access_key,
secret_key, secret_key,
security_token,
session_token, session_token,
}) => { }) => {
let to = ObjectStore::build( let to = ObjectStore::build(
endpoint,
bucket_name, bucket_name,
region.as_ref().clone(), url_style,
region.as_ref(),
Some(access_key.clone()), Some(access_key.clone()),
Some(secret_key.clone()), Some(secret_key.clone()),
security_token.clone(),
session_token.clone(), session_token.clone(),
repo.clone(), repo.clone(),
build_reqwest_client()?, build_client(),
) )
.await?; .await?;
@ -1136,22 +1132,24 @@ async fn main() -> color_eyre::Result<()> {
migrate_inner(&repo, from, &to).await?; migrate_inner(&repo, from, &to).await?;
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name, bucket_name,
url_style,
region, region,
access_key, access_key,
secret_key, secret_key,
security_token,
session_token, session_token,
}) => { }) => {
let from = ObjectStore::build( let from = ObjectStore::build(
endpoint,
&bucket_name, &bucket_name,
url_style,
Serde::into_inner(region), Serde::into_inner(region),
Some(access_key), Some(access_key),
Some(secret_key), Some(secret_key),
security_token,
session_token, session_token,
repo.clone(), repo.clone(),
build_reqwest_client()?, build_client(),
) )
.await?; .await?;
@ -1173,22 +1171,24 @@ async fn main() -> color_eyre::Result<()> {
} }
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name, bucket_name,
url_style,
region, region,
access_key, access_key,
secret_key, secret_key,
security_token,
session_token, session_token,
}) => { }) => {
let store = ObjectStore::build( let store = ObjectStore::build(
endpoint,
&bucket_name, &bucket_name,
url_style,
Serde::into_inner(region), Serde::into_inner(region),
Some(access_key), Some(access_key),
Some(secret_key), Some(secret_key),
security_token,
session_token, session_token,
repo.clone(), repo.clone(),
build_reqwest_client()?, build_client(),
) )
.await?; .await?;

View file

@ -3,16 +3,25 @@ use crate::{
repo::{Repo, SettingsRepo}, repo::{Repo, SettingsRepo},
store::Store, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::{
use futures_util::{Stream, TryStreamExt}; http::{
use s3::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH},
client::Client, command::Command, creds::Credentials, error::S3Error, request_trait::Request, StatusCode,
Bucket, Region, },
web::Bytes,
}; };
use std::{pin::Pin, string::FromUtf8Error}; use awc::{Client, ClientRequest};
use futures_util::{Stream, TryStreamExt};
use rusty_s3::{
actions::{PutObject, S3Action},
Bucket, Credentials, UrlStyle,
};
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
use storage_path_generator::{Generator, Path}; use storage_path_generator::{Generator, Path};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use tracing::Instrument; use tracing::Instrument;
use url::Url;
mod object_id; mod object_id;
pub(crate) use object_id::ObjectId; pub(crate) use object_id::ObjectId;
@ -33,8 +42,8 @@ pub(crate) enum ObjectError {
#[error("Invalid length")] #[error("Invalid length")]
Length, Length,
#[error("Storage error")] #[error("Invalid status")]
Anyhow(#[from] S3Error), Status(StatusCode),
} }
#[derive(Clone)] #[derive(Clone)]
@ -42,7 +51,8 @@ pub(crate) struct ObjectStore {
path_gen: Generator, path_gen: Generator,
repo: Repo, repo: Repo,
bucket: Bucket, bucket: Bucket,
client: reqwest::Client, credentials: Credentials,
client: Client,
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -55,26 +65,30 @@ impl Store for ObjectStore {
where where
Reader: AsyncRead + Unpin, Reader: AsyncRead + Unpin,
{ {
let path = self.next_file().await?; let response = self
.put_object_request()
.await?
.send_stream(ReaderStream::new(reader))
.await?;
self.bucket if response.status().is_success() {
.put_object_stream(&self.client, reader, &path) return Ok(ObjectId::from_string(path));
.await }
.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path)) Err(ObjectError::Status(response.status()).into())
} }
#[tracing::instrument(skip(bytes))] #[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> { async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let path = self.next_file().await?; let req = self.put_object_request().await?;
self.bucket let response = req.send_body(bytes).await?;
.put_object(&self.client, &path, &bytes)
.await
.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path)) if response.status().is_success() {
return Ok(ObjectId::from_string(path));
}
Err(ObjectError::Status(response.status()).into())
} }
#[tracing::instrument] #[tracing::instrument]
@ -84,38 +98,16 @@ impl Store for ObjectStore {
from_start: Option<u64>, from_start: Option<u64>,
len: Option<u64>, len: Option<u64>,
) -> Result<Self::Stream, Error> { ) -> Result<Self::Stream, Error> {
let path = identifier.as_str(); let response = self
.get_object_request(identifier, from_start, len)
.send()
.await?;
let start = from_start.unwrap_or(0); if response.status.is_success() {
let end = len.map(|len| start + len - 1); return Ok(Box::pin(response));
}
let request_span = tracing::trace_span!(parent: None, "Get Object"); Err(ObjectError::Status(response.status()).into())
// NOTE: isolating reqwest in it's own span is to prevent the request's span from getting
// smuggled into a long-lived task. Unfortunately, I am unable to create a minimal
// reproduction of this problem so I can't open a bug about it.
let request = request_span.in_scope(|| {
Client::request(
&self.client,
&self.bucket,
path,
Command::GetObjectRange { start, end },
)
});
let response = request_span
.in_scope(|| request.response())
.instrument(request_span.clone())
.await
.map_err(ObjectError::from)?;
let stream = request_span.in_scope(|| {
response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
});
Ok(Box::pin(stream))
} }
#[tracing::instrument(skip(writer))] #[tracing::instrument(skip(writer))]
@ -127,39 +119,52 @@ impl Store for ObjectStore {
where where
Writer: AsyncWrite + Send + Unpin, Writer: AsyncWrite + Send + Unpin,
{ {
let path = identifier.as_str(); let response = self
.get_object_request(identifier, from_start, len)
.send()
.await?;
self.bucket if !response.status.is_success() {
.get_object_stream(&self.client, path, writer) return Err(ObjectError::Status(response.status()).into());
.await }
.map_err(ObjectError::from)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?; while let Some(res) = response.next().await {
let bytes = res?;
writer.write_all_buf(bytes).await?;
}
writer.flush().await?;
Ok(()) Ok(())
} }
#[tracing::instrument] #[tracing::instrument]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> { async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let path = identifier.as_str(); let response = self.head_object_request(identifier).send().await?;
let (head, _) = self if !response.status.is_success() {
.bucket return Err(ObjectError::Status(response.status()).into());
.head_object(&self.client, path) }
.await
.map_err(ObjectError::from)?;
let length = head.content_length.ok_or(ObjectError::Length)?;
Ok(length as u64) let length = response
.headers()
.get(CONTENT_LENGTH)
.ok_or(ObjectError::Length)?
.to_str()
.ok_or(ObjectError::Length)
.parse::<u64>()
.map_err(|_| ObjectError::Length)?;
Ok(length)
} }
#[tracing::instrument] #[tracing::instrument]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let path = identifier.as_str(); let response = self.delete_object_request(identifier).send().await?;
if !response.status.is_success() {
return Err(ObjectError::Status(response.status()).into());
}
self.bucket
.delete_object(&self.client, path)
.await
.map_err(ObjectError::from)?;
Ok(()) Ok(())
} }
} }
@ -167,11 +172,12 @@ impl Store for ObjectStore {
impl ObjectStore { impl ObjectStore {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) async fn build( pub(crate) async fn build(
endpoint: Url,
bucket_name: &str, bucket_name: &str,
region: Region, url_style: UrlStyle,
region: &str,
access_key: Option<String>, access_key: Option<String>,
secret_key: Option<String>, secret_key: Option<String>,
security_token: Option<String>,
session_token: Option<String>, session_token: Option<String>,
repo: Repo, repo: Repo,
client: reqwest::Client, client: reqwest::Client,
@ -181,28 +187,84 @@ impl ObjectStore {
Ok(ObjectStore { Ok(ObjectStore {
path_gen, path_gen,
repo, repo,
bucket: Bucket::new( bucket: Bucket::new(endpoint, url_style, bucket_name, region)
bucket_name, .map_err(ObjectError::from)?,
match region { credentials: Credentials::new_with_token(access_key, secret_key, session_token),
Region::Custom { endpoint, .. } => Region::Custom {
region: String::from(""),
endpoint,
},
region => region,
},
Credentials {
access_key,
secret_key,
security_token,
session_token,
},
)
.map_err(ObjectError::from)?
.with_path_style(),
client, client,
}) })
} }
async fn put_object_request(&self) -> Result<ClientRequest, Error> {
let path = self.next_file().await?;
let action = self.bucket.put_object(Some(&self.credentials), &path);
Ok(self.build_request(action))
}
fn build_request<A: S3Action>(&self, action: A) -> ClientRequest {
let method = match A::METHOD {
rusty_s3::Method::Head => awc::http::Method::HEAD,
rusty_s3::Method::Get => awc::http::Method::GET,
rusty_s3::Method::Post => awc::http::Method::POST,
rusty_s3::Method::Put => awc::http::Method::PUT,
rusty_s3::Method::Delete => awc::http::Method::DELETE,
};
let url = action.sign(Duration::from_secs(5));
let req = self.client.request(method, url);
action
.headers_mut()
.drain()
.fold(req, |req, tup| req.insert_header(tup))
}
fn get_object_request(
&self,
identifier: &ObjectId,
from_start: Option<u64>,
len: Option<u64>,
) -> ClientRequest {
let action = self
.bucket
.get_object(Some(&self.credentials), identifier.as_str());
let req = self.build_request(action);
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
let range = match (start, end) {
(Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)),
(Some(start), None) => Some(ByteRangeSpec::From(start)),
_ => None,
};
if let Some(range) = range {
req.insert_header(Range::Bytes(vec![range])).send().await?;
} else {
req
}
}
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
let action = self
.bucket
.head_object(Some(&self.credentials), identifier.as_str());
self.build_request(action)
}
fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest {
let action = self
.bucket
.delete_object(Some(&self.credentials), identifier.as_str());
self.build_request(action)
}
async fn next_directory(&self) -> Result<Path, Error> { async fn next_directory(&self) -> Result<Path, Error> {
let path = self.path_gen.next(); let path = self.path_gen.next();
@ -243,8 +305,8 @@ impl std::fmt::Debug for ObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStore") f.debug_struct("ObjectStore")
.field("path_gen", &"generator") .field("path_gen", &"generator")
.field("bucket", &self.bucket.name) .field("bucket", &self.bucket.name())
.field("region", &self.bucket.region) .field("region", &self.bucket.region())
.finish() .finish()
} }
} }