From cf50da30ec395482932b0c872a6319309f66905a Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sun, 31 Oct 2021 12:35:11 -0500 Subject: [PATCH] Use rust-s3 fork with asyncwrite support --- Cargo.lock | 3 +-- Cargo.toml | 2 +- src/store.rs | 2 +- src/store/file_store.rs | 2 +- src/store/object_store.rs | 20 ++++++++------------ 5 files changed, 12 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ac19b9..3878916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1754,8 +1754,7 @@ dependencies = [ [[package]] name = "rust-s3" version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2f26775d15f43dc848ef0ec65f83de8775549e486c7a3a576652049a7122d32" +source = "git+https://github.com/asonix/rust-s3?branch=asonix/use-async-write#f61e422d77484288cf32e40896c8b361eb60afac" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index a1c8384..f48f307 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" pin-project-lite = "0.2.7" reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true} -rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true } +rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/use-async-write" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9.0" diff --git a/src/store.rs b/src/store.rs index 8b20a0f..9bb2afe 100644 --- a/src/store.rs +++ b/src/store.rs @@ -46,7 +46,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Unpin; + Writer: AsyncWrite + Send + Unpin; async fn len(&self, identifier: &Self::Identifier) -> Result; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index c0937cc..6519c21 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -108,7 +108,7 @@ impl Store for FileStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Unpin, + Writer: AsyncWrite + Send + Unpin, { let path = self.path_from_file_id(identifier); diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a47a2b6..a352669 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,6 +1,6 @@ use crate::store::Store; use actix_web::web::Bytes; -use futures_util::stream::{Stream, StreamExt}; +use futures_util::stream::Stream; use s3::{ command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region, }; @@ -10,7 +10,7 @@ use std::{ task::{Context, Poll}, }; use storage_path_generator::{Generator, Path}; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncWrite}; use uuid::Uuid; mod object_id; @@ -109,18 +109,14 @@ impl Store for ObjectStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Unpin, + Writer: AsyncWrite + Send + Unpin, { - let mut stream = self - .to_stream(identifier, None, None) + let path = identifier.as_str(); + + self.bucket + .get_object_stream(path, writer) .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - - while let Some(res) = stream.next().await { - let mut bytes = res?; - - writer.write_all_buf(&mut bytes).await?; - } + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?; Ok(()) }