Replace awc with reqwest

This commit is contained in:
asonix 2023-07-21 16:21:24 -05:00
parent b32e1cc6f6
commit e1ba1c13e5
6 changed files with 292 additions and 183 deletions

263
Cargo.lock generated
View File

@ -159,25 +159,6 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "actix-tls"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde0cf292f7cdc7f070803cb9a0d45c018441321a78b1042ffbbb81ec333297"
dependencies = [
"actix-codec",
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"http",
"log",
"pin-project-lite",
"tokio-rustls",
"tokio-util",
"webpki-roots",
]
[[package]]
name = "actix-utils"
version = "3.0.1"
@ -367,40 +348,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "awc"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ef547a81796eb2dfe9b345aba34c2e08391a0502493711395b36dd64052b69"
dependencies = [
"actix-codec",
"actix-http",
"actix-rt",
"actix-service",
"actix-tls",
"actix-utils",
"ahash 0.7.6",
"base64 0.21.2",
"bytes",
"cfg-if",
"derive_more",
"futures-core",
"futures-util",
"h2",
"http",
"itoa",
"log",
"mime",
"percent-encoding",
"pin-project-lite",
"rand",
"rustls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
]
[[package]]
name = "axum"
version = "0.6.18"
@ -972,8 +919,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -1126,6 +1075,19 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7"
dependencies = [
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
@ -1193,6 +1155,12 @@ dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "is-terminal"
version = "0.4.9"
@ -1348,6 +1316,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -1650,7 +1628,6 @@ dependencies = [
"actix-web",
"anyhow",
"async-trait",
"awc",
"base64 0.21.2",
"clap",
"color-eyre",
@ -1667,6 +1644,9 @@ dependencies = [
"opentelemetry-otlp",
"pin-project-lite",
"quick-xml 0.29.0",
"reqwest",
"reqwest-middleware",
"reqwest-tracing",
"rusty-s3",
"serde",
"serde_cbor",
@ -1683,7 +1663,6 @@ dependencies = [
"toml 0.7.6",
"tracing",
"tracing-actix-web",
"tracing-awc",
"tracing-error",
"tracing-futures",
"tracing-log",
@ -1893,6 +1872,81 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
[[package]]
name = "reqwest"
version = "0.11.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
dependencies = [
"base64 0.21.2",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"ipnet",
"js-sys",
"log",
"mime",
"mime_guess",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]]
name = "reqwest-middleware"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4531c89d50effe1fac90d095c8b133c20c5c714204feee0bfc3fd158e784209d"
dependencies = [
"anyhow",
"async-trait",
"http",
"reqwest",
"serde",
"task-local-extensions",
"thiserror",
]
[[package]]
name = "reqwest-tracing"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8"
dependencies = [
"anyhow",
"async-trait",
"getrandom",
"matchit",
"opentelemetry",
"reqwest",
"reqwest-middleware",
"task-local-extensions",
"tracing",
"tracing-opentelemetry",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -1968,14 +2022,33 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.20.8"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
dependencies = [
"log",
"ring",
"rustls-webpki",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.2",
]
[[package]]
name = "rustls-webpki"
version = "0.101.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e"
dependencies = [
"ring",
"untrusted",
]
[[package]]
@ -2242,6 +2315,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "task-local-extensions"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8"
dependencies = [
"pin-utils",
]
[[package]]
name = "thiserror"
version = "1.0.43"
@ -2358,13 +2440,12 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.23.4"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
@ -2580,23 +2661,6 @@ dependencies = [
"syn 2.0.26",
]
[[package]]
name = "tracing-awc"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa1a68fce4d1a7fad459f81ddcafbdd7c6f6bcda5c7e07d5f42db637931fac7"
dependencies = [
"actix-http",
"actix-service",
"awc",
"bytes",
"futures-core",
"opentelemetry",
"pin-project-lite",
"tracing",
"tracing-opentelemetry",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@ -2701,6 +2765,15 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -2814,6 +2887,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
@ -2843,6 +2928,19 @@ version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasm-streams"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.64"
@ -2969,6 +3067,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "yaml-rust"
version = "0.4.5"

View File

@ -11,12 +11,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
io-uring = [
"actix-rt/io-uring",
"actix-server/io-uring",
"tokio-uring",
"sled/io_uring",
]
io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring", "sled/io_uring"]
[dependencies]
actix-form-data = "0.7.0-beta.3"
@ -25,7 +20,6 @@ actix-server = "2.0.0"
actix-web = { version = "4.0.0", default-features = false }
anyhow = "1.0"
async-trait = "0.1.51"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
base64 = "0.21.0"
clap = { version = "4.0.2", features = ["derive"] }
color-eyre = "0.6"
@ -42,6 +36,9 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12"
pin-project-lite = "0.2.7"
quick-xml = { version = "0.29.0", features = ["serialize"] }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = { version = "0.4.5", features = ["opentelemetry_0_19"] }
rusty-s3 = "0.4.1"
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2"
@ -79,8 +76,3 @@ uuid = { version = "1", features = ["v4", "serde"] }
version = "0.7.5"
default-features = false
features = ["opentelemetry_0_19"]
[dependencies.tracing-awc]
version = "0.1.7"
default-features = false
features = ["opentelemetry_0_19"]

View File

@ -4,6 +4,7 @@ use actix_web::{
};
use std::{
collections::{vec_deque::IntoIter, VecDeque},
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};
@ -76,3 +77,11 @@ impl MessageBody for BytesStream {
Ok(self.into_bytes())
}
}
impl futures_util::Stream for BytesStream {
type Item = Result<Bytes, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
}

View File

@ -79,6 +79,15 @@ pub(crate) enum UploadError {
#[error("Error in exiftool")]
Exiftool(#[from] crate::exiftool::ExifError),
#[error("Error building reqwest client")]
BuildClient(#[source] reqwest::Error),
#[error("Error making request")]
ReqwestMiddleware(#[from] reqwest_middleware::Error),
#[error("Error in request response")]
Reqwest(#[from] reqwest::Error),
#[error("Provided process path is invalid")]
ParsePath,
@ -109,12 +118,6 @@ pub(crate) enum UploadError {
#[error("Unable to download image, bad response {0}")]
Download(actix_web::http::StatusCode),
#[error("Unable to download image")]
Payload(#[from] awc::error::PayloadError),
#[error("Unable to send request, {0}")]
SendRequest(String),
#[error("Tried to save an image with an already-taken name")]
DuplicateAlias,
@ -134,12 +137,6 @@ pub(crate) enum UploadError {
Timeout(#[from] crate::stream::TimeoutError),
}
impl From<awc::error::SendRequestError> for UploadError {
fn from(e: awc::error::SendRequestError) -> Self {
UploadError::SendRequest(e.to_string())
}
}
impl From<actix_web::error::BlockingError> for UploadError {
fn from(_: actix_web::error::BlockingError) -> Self {
UploadError::Canceled

View File

@ -31,12 +31,14 @@ use actix_web::{
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
};
use awc::{Client, Connector};
use futures_util::{
stream::{empty, once},
Stream, StreamExt, TryStreamExt,
};
use once_cell::sync::{Lazy, OnceCell};
use reqwest::Client;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use rusty_s3::UrlStyle;
use std::{
future::ready,
@ -47,7 +49,6 @@ use std::{
};
use tokio::sync::Semaphore;
use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_futures::Instrument;
use self::{
@ -442,7 +443,7 @@ struct UrlQuery {
/// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(client, repo, store))]
async fn download<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
repo: web::Data<R>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
@ -454,6 +455,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}
let stream = res
.bytes_stream()
.map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
@ -1092,19 +1094,18 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
error
}
fn build_client() -> awc::Client {
let connector = CONFIG
.server
.client_pool_size
.map(|size| Connector::new().limit(size))
.unwrap_or_else(Connector::new);
Client::builder()
.connector(connector)
.wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.4.1"))
fn build_client() -> Result<ClientWithMiddleware, Error> {
let client = Client::builder()
.user_agent("pict-rs v0.4.2")
.use_rustls_tls()
.timeout(Duration::from_secs(30))
.finish()
.pool_max_idle_per_host(CONFIG.server.client_pool_size.unwrap_or(usize::MAX))
.build()
.map_err(UploadError::BuildClient)?;
Ok(ClientBuilder::new(client)
.with(TracingMiddleware::default())
.build())
}
fn next_worker_id() -> String {
@ -1123,7 +1124,7 @@ fn configure_endpoints<
config: &mut web::ServiceConfig,
repo: R,
store: S,
client: Client,
client: ClientWithMiddleware,
extra_config: F,
) {
config
@ -1215,10 +1216,11 @@ where
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
repo: R,
store: FileStore,
client: ClientWithMiddleware,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let client = client.clone();
let store = store.clone();
let repo = repo.clone();
@ -1242,10 +1244,11 @@ async fn launch_object_store<
>(
repo: R,
store_config: ObjectStoreConfig,
client: ClientWithMiddleware,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let client = client.clone();
let store = store_config.clone().build(client.clone());
let repo = repo.clone();
@ -1265,7 +1268,7 @@ async fn launch_object_store<
async fn migrate_inner<S1>(
repo: Repo,
client: Client,
client: ClientWithMiddleware,
from: S1,
to: config::primitives::Store,
skip_missing_files: bool,
@ -1393,6 +1396,7 @@ fn sled_extra_config(sc: &mut web::ServiceConfig) {
pub async fn run() -> color_eyre::Result<()> {
let repo = Repo::open(CONFIG.repo.clone())?;
repo.migrate_from_db(CONFIG.old_db.path.clone()).await?;
let client = build_client()?;
match (*OPERATION).clone() {
Operation::Run => (),
@ -1401,8 +1405,6 @@ pub async fn run() -> color_eyre::Result<()> {
from,
to,
} => {
let client = build_client();
match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
@ -1457,7 +1459,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store, sled_extra_config).await?;
launch_file_store(sled_repo, store, client, sled_extra_config).await?;
}
}
}
@ -1496,7 +1498,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_object_store(sled_repo, store, sled_extra_config).await?;
launch_object_store(sled_repo, store, client, sled_extra_config).await?;
}
}
}

View File

@ -5,16 +5,17 @@ use crate::{
};
use actix_rt::task::JoinError;
use actix_web::{
error::{BlockingError, PayloadError},
error::BlockingError,
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
web::Bytes,
};
use awc::{error::SendRequestError, Client, ClientRequest, ClientResponse, SendClientRequest};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::{Stream, StreamExt, TryStreamExt};
use reqwest::{header::RANGE, Body, Response};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle};
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
use storage_path_generator::{Generator, Path};
@ -46,8 +47,11 @@ pub(crate) enum ObjectError {
#[error("IO Error")]
IO(#[from] std::io::Error),
#[error("Error making request: {0}")]
SendRequest(String),
#[error("Error making request")]
RequestMiddleware(#[from] reqwest_middleware::Error),
#[error("Error in request response")]
Request(#[from] reqwest::Error),
#[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error),
@ -66,15 +70,6 @@ pub(crate) enum ObjectError {
#[error("Invalid status: {0}\n{1}")]
Status(StatusCode, String),
#[error("Unable to upload image")]
Upload(awc::error::PayloadError),
}
impl From<SendRequestError> for ObjectError {
fn from(e: SendRequestError) -> Self {
Self::SendRequest(e.to_string())
}
}
impl From<JoinError> for ObjectError {
@ -95,7 +90,7 @@ pub(crate) struct ObjectStore {
repo: Repo,
bucket: Bucket,
credentials: Credentials,
client: Client,
client: ClientWithMiddleware,
signature_expiration: Duration,
client_timeout: Duration,
}
@ -121,7 +116,7 @@ struct InitiateMultipartUploadResponse {
}
impl ObjectStoreConfig {
pub(crate) fn build(self, client: Client) -> ObjectStore {
pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore {
ObjectStore {
path_gen: self.path_gen,
repo: self.repo,
@ -134,11 +129,8 @@ impl ObjectStoreConfig {
}
}
fn payload_to_io_error(e: PayloadError) -> std::io::Error {
match e {
PayloadError::Io(io) => io,
otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()),
}
fn payload_to_io_error(e: reqwest::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}
#[tracing::instrument(skip(stream))]
@ -159,15 +151,15 @@ where
Ok(buf)
}
async fn status_error(mut response: ClientResponse) -> StoreError {
let body = match response.body().await {
Err(e) => return ObjectError::Upload(e).into(),
async fn status_error(response: Response) -> StoreError {
let status = response.status();
let body = match response.text().await {
Err(e) => return ObjectError::Request(e).into(),
Ok(body) => body,
};
let body = String::from_utf8_lossy(&body).to_string();
ObjectError::Status(response.status(), body).into()
ObjectError::Status(status, body).into()
}
#[async_trait::async_trait(?Send)]
@ -217,7 +209,8 @@ impl Store for ObjectStore {
drop(stream);
let (req, object_id) = self.put_object_request(content_type).await?;
let response = req
.send_body(first_chunk)
.body(Body::wrap_stream(first_chunk))
.send()
.await
.map_err(ObjectError::from)?;
@ -231,13 +224,13 @@ impl Store for ObjectStore {
let mut first_chunk = Some(first_chunk);
let (req, object_id) = self.create_multipart_request(content_type).await?;
let mut response = req.send().await.map_err(ObjectError::from)?;
let response = req.send().await.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
}
let body = response.body().await.map_err(ObjectError::Upload)?;
let body = response.bytes().await.map_err(ObjectError::Request)?;
let body: InitiateMultipartUploadResponse =
quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?;
let upload_id = &body.upload_id;
@ -273,7 +266,8 @@ impl Store for ObjectStore {
&upload_id2,
)
.await?
.send_body(buf)
.body(Body::wrap_stream(buf))
.send()
.await
.map_err(ObjectError::from)?;
@ -345,7 +339,7 @@ impl Store for ObjectStore {
) -> Result<Self::Identifier, StoreError> {
let (req, object_id) = self.put_object_request(content_type).await?;
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
let response = req.body(bytes).send().await.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
@ -371,7 +365,9 @@ impl Store for ObjectStore {
return Err(status_error(response).await);
}
Ok(Box::pin(response.map_err(payload_to_io_error)))
Ok(Box::pin(
response.bytes_stream().map_err(payload_to_io_error),
))
}
#[tracing::instrument(skip(self, writer))]
@ -383,7 +379,7 @@ impl Store for ObjectStore {
where
Writer: AsyncWrite + Unpin,
{
let mut response = self
let response = self
.get_object_request(identifier, None, None)
.send()
.await
@ -396,7 +392,9 @@ impl Store for ObjectStore {
));
}
while let Some(res) = response.next().await {
let mut stream = response.bytes_stream();
while let Some(res) = stream.next().await {
let mut bytes = res.map_err(payload_to_io_error)?;
writer.write_all_buf(&mut bytes).await?;
}
@ -477,7 +475,7 @@ impl ObjectStore {
})
}
async fn head_bucket_request(&self) -> Result<ClientRequest, StoreError> {
async fn head_bucket_request(&self) -> Result<RequestBuilder, StoreError> {
let action = self.bucket.head_bucket(Some(&self.credentials));
Ok(self.build_request(action))
@ -486,7 +484,7 @@ impl ObjectStore {
async fn put_object_request(
&self,
content_type: mime::Mime,
) -> Result<(ClientRequest, ObjectId), StoreError> {
) -> Result<(RequestBuilder, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
@ -501,7 +499,7 @@ impl ObjectStore {
async fn create_multipart_request(
&self,
content_type: mime::Mime,
) -> Result<(ClientRequest, ObjectId), StoreError> {
) -> Result<(RequestBuilder, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self
@ -521,7 +519,7 @@ impl ObjectStore {
object_id: &ObjectId,
part_number: u16,
upload_id: &str,
) -> Result<ClientRequest, ObjectError> {
) -> Result<RequestBuilder, ObjectError> {
use md5::Digest;
let mut action = self.bucket.upload_part(
@ -559,12 +557,12 @@ impl ObjectStore {
Ok(self.build_request(action))
}
fn send_complete_multipart_request<'a, I: Iterator<Item = &'a str>>(
async fn send_complete_multipart_request<'a, I: Iterator<Item = &'a str>>(
&'a self,
object_id: &'a ObjectId,
upload_id: &'a str,
etags: I,
) -> SendClientRequest {
) -> Result<Response, reqwest_middleware::Error> {
let mut action = self.bucket.complete_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
@ -578,14 +576,14 @@ impl ObjectStore {
let (req, action) = self.build_request_inner(action);
req.send_body(action.body())
req.body(action.body()).send().await
}
fn create_abort_multipart_request(
&self,
object_id: &ObjectId,
upload_id: &str,
) -> ClientRequest {
) -> RequestBuilder {
let action = self.bucket.abort_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
@ -595,18 +593,18 @@ impl ObjectStore {
self.build_request(action)
}
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest {
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> RequestBuilder {
let (req, _) = self.build_request_inner(action);
req
}
fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) {
fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (RequestBuilder, A) {
let method = match A::METHOD {
rusty_s3::Method::Head => awc::http::Method::HEAD,
rusty_s3::Method::Get => awc::http::Method::GET,
rusty_s3::Method::Post => awc::http::Method::POST,
rusty_s3::Method::Put => awc::http::Method::PUT,
rusty_s3::Method::Delete => awc::http::Method::DELETE,
rusty_s3::Method::Head => reqwest::Method::HEAD,
rusty_s3::Method::Get => reqwest::Method::GET,
rusty_s3::Method::Post => reqwest::Method::POST,
rusty_s3::Method::Put => reqwest::Method::PUT,
rusty_s3::Method::Delete => reqwest::Method::DELETE,
};
let url = action.sign(self.signature_expiration);
@ -619,7 +617,7 @@ impl ObjectStore {
let req = action
.headers_mut()
.iter()
.fold(req, |req, tup| req.insert_header(tup));
.fold(req, |req, (name, value)| req.header(name, value));
(req, action)
}
@ -629,7 +627,7 @@ impl ObjectStore {
identifier: &ObjectId,
from_start: Option<u64>,
len: Option<u64>,
) -> ClientRequest {
) -> RequestBuilder {
let action = self
.bucket
.get_object(Some(&self.credentials), identifier.as_str());
@ -639,14 +637,18 @@ impl ObjectStore {
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
req.insert_header(Range::Bytes(vec![if let Some(end) = end {
ByteRangeSpec::FromTo(start, end)
} else {
ByteRangeSpec::From(start)
}]))
req.header(
RANGE,
Range::Bytes(vec![if let Some(end) = end {
ByteRangeSpec::FromTo(start, end)
} else {
ByteRangeSpec::From(start)
}])
.to_string(),
)
}
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
fn head_object_request(&self, identifier: &ObjectId) -> RequestBuilder {
let action = self
.bucket
.head_object(Some(&self.credentials), identifier.as_str());
@ -654,7 +656,7 @@ impl ObjectStore {
self.build_request(action)
}
fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest {
fn delete_object_request(&self, identifier: &ObjectId) -> RequestBuilder {
let action = self
.bucket
.delete_object(Some(&self.credentials), identifier.as_str());