From 4c8da2b41456ee6a267e7ce29925998f0b1d3a4d Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Tue, 2 Nov 2021 17:21:00 -0500 Subject: [PATCH] Keep a reqwest client around for sharing an object storage connection pool --- Cargo.lock | 8 ++++---- Cargo.toml | 4 ++-- src/main.rs | 26 +++++++++++++++++++------- src/store/object_store.rs | 26 ++++++++++++++++++-------- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dca183d..631c0a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,9 +282,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.44" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7" [[package]] name = "arrayvec" @@ -371,7 +371,7 @@ dependencies = [ [[package]] name = "aws-creds" version = "0.26.2" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#8d185a2b10654d3f2b86d99bd149e6a0197f2aac" +source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#a9323756817ed94665a0f5a05770290fa3a2160e" dependencies = [ "anyhow", "dirs", @@ -1711,7 +1711,7 @@ dependencies = [ [[package]] name = "rust-s3" version = "0.27.0" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#8d185a2b10654d3f2b86d99bd149e6a0197f2aac" +source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#a9323756817ed94665a0f5a05770290fa3a2160e" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 9100297..e5d3f97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,8 @@ once_cell = "1.4.0" opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" pin-project-lite = "0.2.7" -reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true} -rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/main" } +reqwest = { version = "0.11.5", default-features = false, features = ["rustls-tls", "stream"], optional = true } +rust-s3 = { version = "0.27.0", default-features = false, features = ["with-reqwest"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/main" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9.0" diff --git a/src/main.rs b/src/main.rs index 2a8d209..d49d56c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -692,6 +692,19 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { error } +fn build_client() -> awc::Client { + Client::builder() + .header("User-Agent", "pict-rs v0.3.0-main") + .finish() +} + +#[cfg(feature = "object-storage")] +fn build_reqwest_client() -> reqwest::Result { + reqwest::Client::builder() + .user_agent("pict-rs v0.3.0-main") + .build() +} + async fn launch(manager: UploadManager, store: S) -> anyhow::Result<()> where S::Error: Unpin, @@ -742,8 +755,8 @@ where .field( "images", Field::array(Field::file(move |filename, _, stream| { - let manager = manager2.clone(); let store = store2.clone(); + let manager = manager2.clone(); let span = tracing::info_span!("file-import", ?filename); @@ -767,16 +780,12 @@ where ); HttpServer::new(move || { - let client = Client::builder() - .header("User-Agent", "pict-rs v0.3.0-main") - .finish(); - App::new() .wrap(TracingLogger::default()) .wrap(Deadline) .app_data(web::Data::new(store.clone())) .app_data(web::Data::new(manager.clone())) - .app_data(web::Data::new(client)) + .app_data(web::Data::new(build_client())) .app_data(web::Data::new(CONFIG.allowed_filters())) .service( web::scope("/image") @@ -868,6 +877,7 @@ where security_token.clone(), session_token.clone(), &db, + build_reqwest_client()?, )?; manager.migrate_store::(from, to).await?; @@ -915,6 +925,7 @@ async fn main() -> anyhow::Result<()> { security_token.clone(), session_token.clone(), &db, + build_reqwest_client()?, )?; migrate_inner(&manager, &db, from, to).await?; @@ -928,7 +939,7 @@ async fn main() -> anyhow::Result<()> { config::Store::FileStore { path } => { let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); - let store = FileStore::build(path, &db)?; + let store = FileStore::build(path.clone(), &db)?; manager.restructure(&store).await?; launch(manager, store).await @@ -950,6 +961,7 @@ async fn main() -> anyhow::Result<()> { security_token.clone(), session_token.clone(), &db, + build_reqwest_client()?, )?; launch(manager, store).await diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a352669..5239257 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -2,7 +2,7 @@ use crate::store::Store; use actix_web::web::Bytes; use futures_util::stream::Stream; use s3::{ - command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region, + client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region, }; use std::{ pin::Pin, @@ -44,6 +44,7 @@ pub(crate) struct ObjectStore { path_gen: Generator, settings_tree: sled::Tree, bucket: Bucket, + client: reqwest::Client, } pin_project_lite::pin_project! { @@ -69,7 +70,9 @@ impl Store for ObjectStore { { let path = self.next_file()?; - self.bucket.put_object_stream(reader, &path).await?; + self.bucket + .put_object_stream(&self.client, reader, &path) + .await?; Ok(ObjectId::from_string(path)) } @@ -78,7 +81,7 @@ impl Store for ObjectStore { async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file()?; - self.bucket.put_object(&path, &bytes).await?; + self.bucket.put_object(&self.client, &path, &bytes).await?; Ok(ObjectId::from_string(path)) } @@ -95,7 +98,12 @@ impl Store for ObjectStore { let start = from_start.unwrap_or(0); let end = len.map(|len| start + len); - let request = Reqwest::new(&self.bucket, path, Command::GetObjectRange { start, end }); + let request = Client::request( + &self.client, + &self.bucket, + path, + Command::GetObjectRange { start, end }, + ); let response = request.response().await?; @@ -114,7 +122,7 @@ impl Store for ObjectStore { let path = identifier.as_str(); self.bucket - .get_object_stream(path, writer) + .get_object_stream(&self.client, path, writer) .await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?; @@ -125,7 +133,7 @@ impl Store for ObjectStore { async fn len(&self, identifier: &Self::Identifier) -> Result { let path = identifier.as_str(); - let (head, _) = self.bucket.head_object(path).await?; + let (head, _) = self.bucket.head_object(&self.client, path).await?; let length = head.content_length.ok_or(ObjectError::Length)?; Ok(length as u64) @@ -135,7 +143,7 @@ impl Store for ObjectStore { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { let path = identifier.as_str(); - self.bucket.delete_object(path).await?; + self.bucket.delete_object(&self.client, path).await?; Ok(()) } } @@ -149,7 +157,8 @@ impl ObjectStore { security_token: Option, session_token: Option, db: &sled::Db, - ) -> Result { + client: reqwest::Client, + ) -> Result { let settings_tree = db.open_tree("settings")?; let path_gen = init_generator(&settings_tree)?; @@ -173,6 +182,7 @@ impl ObjectStore { session_token, }, )?, + client, }) }