Keep a reqwest client around for sharing an object storage connection pool

This commit is contained in:
Aode (lion) 2021-11-02 17:21:00 -05:00
parent eb8a31e80f
commit 4c8da2b414
4 changed files with 43 additions and 21 deletions

8
Cargo.lock generated
View File

@ -282,9 +282,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.44" version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
@ -371,7 +371,7 @@ dependencies = [
[[package]] [[package]]
name = "aws-creds" name = "aws-creds"
version = "0.26.2" version = "0.26.2"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#8d185a2b10654d3f2b86d99bd149e6a0197f2aac" source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#a9323756817ed94665a0f5a05770290fa3a2160e"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"dirs", "dirs",
@ -1711,7 +1711,7 @@ dependencies = [
[[package]] [[package]]
name = "rust-s3" name = "rust-s3"
version = "0.27.0" version = "0.27.0"
source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#8d185a2b10654d3f2b86d99bd149e6a0197f2aac" source = "git+https://github.com/asonix/rust-s3?branch=asonix/main#a9323756817ed94665a0f5a05770290fa3a2160e"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",

View File

@ -32,8 +32,8 @@ once_cell = "1.4.0"
opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = "0.9" opentelemetry-otlp = "0.9"
pin-project-lite = "0.2.7" pin-project-lite = "0.2.7"
reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true} reqwest = { version = "0.11.5", default-features = false, features = ["rustls-tls", "stream"], optional = true }
rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/main" } rust-s3 = { version = "0.27.0", default-features = false, features = ["with-reqwest"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/main" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.9.0" sha2 = "0.9.0"

View File

@ -692,6 +692,19 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
error error
} }
fn build_client() -> awc::Client {
Client::builder()
.header("User-Agent", "pict-rs v0.3.0-main")
.finish()
}
#[cfg(feature = "object-storage")]
fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
reqwest::Client::builder()
.user_agent("pict-rs v0.3.0-main")
.build()
}
async fn launch<S: Store + Clone + 'static>(manager: UploadManager, store: S) -> anyhow::Result<()> async fn launch<S: Store + Clone + 'static>(manager: UploadManager, store: S) -> anyhow::Result<()>
where where
S::Error: Unpin, S::Error: Unpin,
@ -742,8 +755,8 @@ where
.field( .field(
"images", "images",
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let manager = manager2.clone();
let store = store2.clone(); let store = store2.clone();
let manager = manager2.clone();
let span = tracing::info_span!("file-import", ?filename); let span = tracing::info_span!("file-import", ?filename);
@ -767,16 +780,12 @@ where
); );
HttpServer::new(move || { HttpServer::new(move || {
let client = Client::builder()
.header("User-Agent", "pict-rs v0.3.0-main")
.finish();
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.app_data(web::Data::new(store.clone())) .app_data(web::Data::new(store.clone()))
.app_data(web::Data::new(manager.clone())) .app_data(web::Data::new(manager.clone()))
.app_data(web::Data::new(client)) .app_data(web::Data::new(build_client()))
.app_data(web::Data::new(CONFIG.allowed_filters())) .app_data(web::Data::new(CONFIG.allowed_filters()))
.service( .service(
web::scope("/image") web::scope("/image")
@ -868,6 +877,7 @@ where
security_token.clone(), security_token.clone(),
session_token.clone(), session_token.clone(),
&db, &db,
build_reqwest_client()?,
)?; )?;
manager.migrate_store::<S1, ObjectStore>(from, to).await?; manager.migrate_store::<S1, ObjectStore>(from, to).await?;
@ -915,6 +925,7 @@ async fn main() -> anyhow::Result<()> {
security_token.clone(), security_token.clone(),
session_token.clone(), session_token.clone(),
&db, &db,
build_reqwest_client()?,
)?; )?;
migrate_inner(&manager, &db, from, to).await?; migrate_inner(&manager, &db, from, to).await?;
@ -928,7 +939,7 @@ async fn main() -> anyhow::Result<()> {
config::Store::FileStore { path } => { config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let store = FileStore::build(path, &db)?; let store = FileStore::build(path.clone(), &db)?;
manager.restructure(&store).await?; manager.restructure(&store).await?;
launch(manager, store).await launch(manager, store).await
@ -950,6 +961,7 @@ async fn main() -> anyhow::Result<()> {
security_token.clone(), security_token.clone(),
session_token.clone(), session_token.clone(),
&db, &db,
build_reqwest_client()?,
)?; )?;
launch(manager, store).await launch(manager, store).await

View File

@ -2,7 +2,7 @@ use crate::store::Store;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_util::stream::Stream; use futures_util::stream::Stream;
use s3::{ use s3::{
command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region, client::Client, command::Command, creds::Credentials, request_trait::Request, Bucket, Region,
}; };
use std::{ use std::{
pin::Pin, pin::Pin,
@ -44,6 +44,7 @@ pub(crate) struct ObjectStore {
path_gen: Generator, path_gen: Generator,
settings_tree: sled::Tree, settings_tree: sled::Tree,
bucket: Bucket, bucket: Bucket,
client: reqwest::Client,
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
@ -69,7 +70,9 @@ impl Store for ObjectStore {
{ {
let path = self.next_file()?; let path = self.next_file()?;
self.bucket.put_object_stream(reader, &path).await?; self.bucket
.put_object_stream(&self.client, reader, &path)
.await?;
Ok(ObjectId::from_string(path)) Ok(ObjectId::from_string(path))
} }
@ -78,7 +81,7 @@ impl Store for ObjectStore {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> { async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file()?; let path = self.next_file()?;
self.bucket.put_object(&path, &bytes).await?; self.bucket.put_object(&self.client, &path, &bytes).await?;
Ok(ObjectId::from_string(path)) Ok(ObjectId::from_string(path))
} }
@ -95,7 +98,12 @@ impl Store for ObjectStore {
let start = from_start.unwrap_or(0); let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len); let end = len.map(|len| start + len);
let request = Reqwest::new(&self.bucket, path, Command::GetObjectRange { start, end }); let request = Client::request(
&self.client,
&self.bucket,
path,
Command::GetObjectRange { start, end },
);
let response = request.response().await?; let response = request.response().await?;
@ -114,7 +122,7 @@ impl Store for ObjectStore {
let path = identifier.as_str(); let path = identifier.as_str();
self.bucket self.bucket
.get_object_stream(path, writer) .get_object_stream(&self.client, path, writer)
.await .await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?;
@ -125,7 +133,7 @@ impl Store for ObjectStore {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error> { async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error> {
let path = identifier.as_str(); let path = identifier.as_str();
let (head, _) = self.bucket.head_object(path).await?; let (head, _) = self.bucket.head_object(&self.client, path).await?;
let length = head.content_length.ok_or(ObjectError::Length)?; let length = head.content_length.ok_or(ObjectError::Length)?;
Ok(length as u64) Ok(length as u64)
@ -135,7 +143,7 @@ impl Store for ObjectStore {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> {
let path = identifier.as_str(); let path = identifier.as_str();
self.bucket.delete_object(path).await?; self.bucket.delete_object(&self.client, path).await?;
Ok(()) Ok(())
} }
} }
@ -149,7 +157,8 @@ impl ObjectStore {
security_token: Option<String>, security_token: Option<String>,
session_token: Option<String>, session_token: Option<String>,
db: &sled::Db, db: &sled::Db,
) -> Result<Self, ObjectError> { client: reqwest::Client,
) -> Result<ObjectStore, ObjectError> {
let settings_tree = db.open_tree("settings")?; let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?; let path_gen = init_generator(&settings_tree)?;
@ -173,6 +182,7 @@ impl ObjectStore {
session_token, session_token,
}, },
)?, )?,
client,
}) })
} }