Use rust-s3 fork with asyncwrite support

This commit is contained in:
Aode (lion) 2021-10-31 12:35:11 -05:00
parent 69e82f3f3f
commit cf50da30ec
5 changed files with 12 additions and 17 deletions

3
Cargo.lock generated
View File

@ -1754,8 +1754,7 @@ dependencies = [
[[package]] [[package]]
name = "rust-s3" name = "rust-s3"
version = "0.27.0" version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/asonix/rust-s3?branch=asonix/use-async-write#f61e422d77484288cf32e40896c8b361eb60afac"
checksum = "b2f26775d15f43dc848ef0ec65f83de8775549e486c7a3a576652049a7122d32"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",

View File

@ -33,7 +33,7 @@ opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = "0.9" opentelemetry-otlp = "0.9"
pin-project-lite = "0.2.7" pin-project-lite = "0.2.7"
reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true} reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true}
rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true } rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/use-async-write" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.9.0" sha2 = "0.9.0"

View File

@ -46,7 +46,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
writer: &mut Writer, writer: &mut Writer,
) -> Result<(), std::io::Error> ) -> Result<(), std::io::Error>
where where
Writer: AsyncWrite + Unpin; Writer: AsyncWrite + Send + Unpin;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error>; async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error>;

View File

@ -108,7 +108,7 @@ impl Store for FileStore {
writer: &mut Writer, writer: &mut Writer,
) -> Result<(), std::io::Error> ) -> Result<(), std::io::Error>
where where
Writer: AsyncWrite + Unpin, Writer: AsyncWrite + Send + Unpin,
{ {
let path = self.path_from_file_id(identifier); let path = self.path_from_file_id(identifier);

View File

@ -1,6 +1,6 @@
use crate::store::Store; use crate::store::Store;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_util::stream::{Stream, StreamExt}; use futures_util::stream::Stream;
use s3::{ use s3::{
command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region, command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region,
}; };
@ -10,7 +10,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use storage_path_generator::{Generator, Path}; use storage_path_generator::{Generator, Path};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWrite};
use uuid::Uuid; use uuid::Uuid;
mod object_id; mod object_id;
@ -109,18 +109,14 @@ impl Store for ObjectStore {
writer: &mut Writer, writer: &mut Writer,
) -> Result<(), std::io::Error> ) -> Result<(), std::io::Error>
where where
Writer: AsyncWrite + Unpin, Writer: AsyncWrite + Send + Unpin,
{ {
let mut stream = self let path = identifier.as_str();
.to_stream(identifier, None, None)
self.bucket
.get_object_stream(path, writer)
.await .await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?;
while let Some(res) = stream.next().await {
let mut bytes = res?;
writer.write_all_buf(&mut bytes).await?;
}
Ok(()) Ok(())
} }