From 25209e29c035cec50ca3b0fac56be07ead8be2fb Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 17:18:53 -0500 Subject: [PATCH] Make it compile --- Cargo.lock | 7 +- Cargo.toml | 2 +- src/backgrounded.rs | 8 +-- src/generate.rs | 4 +- src/ingest.rs | 9 +-- src/ingest/hasher.rs | 18 +++--- src/main.rs | 104 +++++++++++++++--------------- src/store.rs | 47 ++++++++++---- src/store/file_store.rs | 55 ++++++---------- src/store/object_store.rs | 130 ++++++++++++++++++++++++-------------- 10 files changed, 216 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41c6cfe..e07d1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,7 +22,8 @@ dependencies = [ [[package]] name = "actix-form-data" version = "0.7.0-beta.0" -source = "git+https://git.asonix.dog/asonix/actix-form-data?branch=v0.7.x#3525bcd09cd030df3f2ed7684f2aad1bcc42d68b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e721f3919cb43c566c0dbb6a9cb5ad5106ac42b6b3c0d21a7a3e762455de957a" dependencies = [ "actix-multipart", "actix-rt", @@ -1672,9 +1673,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" dependencies = [ "unicode-ident", ] diff --git a/Cargo.toml b/Cargo.toml index 86e4ad5..cc58ce3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ io-uring = [ ] [dependencies] -actix-form-data = { version = "0.7.0-beta.0", git = "https://git.asonix.dog/asonix/actix-form-data", branch = "v0.7.x" } +actix-form-data = "0.7.0-beta.0" actix-rt = { version = "2.7.0", default-features = false } actix-server = "2.0.0" actix-web = { version = "4.0.0", default-features = false } diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 73eb5e1..4049fbe 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -5,7 +5,6 @@ use crate::{ }; use actix_web::web::Bytes; use futures_util::{Stream, TryStreamExt}; -use tokio_util::io::StreamReader; use tracing::{Instrument, Span}; pub(crate) struct Backgrounded @@ -38,7 +37,7 @@ where pub(crate) async fn proxy

(repo: R, store: S, stream: P) -> Result where - P: Stream>, + P: Stream> + Unpin + 'static, { let mut this = Self { repo, @@ -53,14 +52,13 @@ where async fn do_proxy

(&mut self, store: S, stream: P) -> Result<(), Error> where - P: Stream>, + P: Stream> + Unpin + 'static, { UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?; let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - let mut reader = StreamReader::new(Box::pin(stream)); - let identifier = store.save_async_read(&mut reader).await?; + let identifier = store.save_stream(stream).await?; self.identifier = Some(identifier); diff --git a/src/generate.rs b/src/generate.rs index 596a5a0..ae7ecb9 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -57,14 +57,14 @@ async fn process( identifier } else { let identifier = repo.identifier(hash.clone()).await?; - let mut reader = crate::ffmpeg::thumbnail( + let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, InputFormat::Mp4, ThumbnailFormat::Jpeg, ) .await?; - let motion_identifier = store.save_async_read(&mut reader).await?; + let motion_identifier = store.save_async_read(reader).await?; repo.relate_motion_identifier(hash.clone(), &motion_identifier) .await?; diff --git a/src/ingest.rs b/src/ingest.rs index f4c49c7..e3a3fc6 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -55,7 +55,7 @@ where pub(crate) async fn ingest( repo: &R, store: &S, - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, is_cached: bool, @@ -77,9 +77,10 @@ where ) .await?; - let mut hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher = hasher_reader.hasher(); - let identifier = store.save_async_read(&mut hasher_reader).await?; + let identifier = store.save_async_read(hasher_reader).await?; drop(permit); @@ -90,7 +91,7 @@ where identifier: Some(identifier.clone()), }; - let hash = hasher_reader.finalize_reset().await?; + let hash = hasher.borrow_mut().finalize_reset().to_vec(); session.hash = Some(hash.clone()); diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 71708ef..a3ce995 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -1,8 +1,8 @@ -use crate::error::Error; -use actix_web::web; use sha2::{digest::FixedOutputReset, Digest}; use std::{ + cell::RefCell, pin::Pin, + rc::Rc, task::{Context, Poll}, }; use tokio::io::{AsyncRead, ReadBuf}; @@ -12,7 +12,7 @@ pin_project_lite::pin_project! { #[pin] inner: I, - hasher: D, + hasher: Rc>, } } @@ -23,14 +23,12 @@ where pub(super) fn new(reader: I, digest: D) -> Self { Hasher { inner: reader, - hasher: digest, + hasher: Rc::new(RefCell::new(digest)), } } - pub(super) async fn finalize_reset(self) -> Result, Error> { - let mut hasher = self.hasher; - let hash = web::block(move || hasher.finalize_reset().to_vec()).await?; - Ok(hash) + pub(super) fn hasher(&self) -> Rc> { + Rc::clone(&self.hasher) } } @@ -53,7 +51,9 @@ where let poll_res = reader.poll_read(cx, buf); let after_len = buf.filled().len(); if after_len > before_len { - hasher.update(&buf.filled()[before_len..after_len]); + hasher + .borrow_mut() + .update(&buf.filled()[before_len..after_len]); } poll_res } diff --git a/src/main.rs b/src/main.rs index e8d44b9..2d83b9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,7 +66,11 @@ use self::{ UploadResult, }, serde_str::Serde, - store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, + store::{ + file_store::FileStore, + object_store::{ObjectStore, ObjectStoreConfig}, + Identifier, Store, StoreConfig, + }, stream::{StreamLimit, StreamTimeout}, }; @@ -449,7 +453,7 @@ async fn download( #[instrument(name = "Downloading file inline", skip(stream))] async fn do_download_inline( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -475,7 +479,7 @@ async fn do_download_inline( #[instrument(name = "Downloading file in background", skip(stream))] async fn do_download_backgrounded( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -971,15 +975,15 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +async fn launch( repo: R, - store: S::Config, + store_config: SC, ) -> color_eyre::Result<()> { repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; HttpServer::new(move || { - let store = S::init(store.clone()); + let store = store_config.clone().build(); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1008,20 +1012,23 @@ async fn launch( .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) .service( web::scope("/backgrounded") .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload_backgrounded::)), + .route(web::post().to(upload_backgrounded::)), ) .service( - web::resource("/claim").route(web::get().to(claim_upload::)), + web::resource("/claim") + .route(web::get().to(claim_upload::)), ), ) - .service(web::resource("/download").route(web::get().to(download::))) + .service( + web::resource("/download").route(web::get().to(download::)), + ) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) @@ -1029,27 +1036,27 @@ async fn launch( ) .service( web::resource("/original/{filename}") - .route(web::get().to(serve::)) - .route(web::head().to(serve_head::)), + .route(web::get().to(serve::)) + .route(web::head().to(serve_head::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process::)) - .route(web::head().to(process_head::)), + .route(web::get().to(process::)) + .route(web::head().to(process_head::)), ) .service( web::resource("/process_backgrounded.{ext}") - .route(web::get().to(process_backgrounded::)), + .route(web::get().to(process_backgrounded::)), ) .service( web::scope("/details") .service( web::resource("/original/{filename}") - .route(web::get().to(details::)), + .route(web::get().to(details::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details::)), + .route(web::get().to(process_details::)), ), ), ) @@ -1058,7 +1065,7 @@ async fn launch( .wrap(Internal( CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), )) - .service(web::resource("/import").route(web::post().to(import::))) + .service(web::resource("/import").route(web::post().to(import::))) .service( web::resource("/variants").route(web::delete().to(clean_variants::)), ) @@ -1075,14 +1082,13 @@ async fn launch( Ok(()) } -async fn migrate_inner(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()> +async fn migrate_inner(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()> where S1: Store, { match to { config::Store::Filesystem(config::Filesystem { path }) => { - let to = FileStore::build(path.clone(), repo.clone()).await?; - let to = FileStore::init(to); + let to = FileStore::build(path.clone(), repo.clone()).await?.build(); match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, @@ -1100,20 +1106,19 @@ where let to = ObjectStore::build( endpoint.clone(), bucket_name, - if *use_path_style { + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - region.as_ref(), - Some(access_key.clone()), - Some(secret_key.clone()), - session_token.clone(), + region, + access_key, + secret_key, + session_token, repo.clone(), ) - .await?; - - let to = ObjectStore::init(to); + .await? + .build(); match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, @@ -1136,9 +1141,8 @@ async fn main() -> color_eyre::Result<()> { Operation::MigrateStore { from, to } => { match from { config::Store::Filesystem(config::Filesystem { path }) => { - let from = FileStore::build(path.clone(), repo.clone()).await?; - let from = FileStore::init(from); - migrate_inner(&repo, from, &to).await?; + let from = FileStore::build(path.clone(), repo.clone()).await?.build(); + migrate_inner(&repo, from, to).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, @@ -1150,23 +1154,23 @@ async fn main() -> color_eyre::Result<()> { session_token, }) => { let from = ObjectStore::build( - endpoint.clone(), - &bucket_name, - if *use_path_style { + endpoint, + bucket_name, + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), + region, + access_key, + secret_key, session_token, repo.clone(), ) - .await?; - let from = ObjectStore::init(from); + .await? + .build(); - migrate_inner(&repo, from, &to).await?; + migrate_inner(&repo, from, to).await?; } } @@ -1193,23 +1197,23 @@ async fn main() -> color_eyre::Result<()> { session_token, }) => { let store = ObjectStore::build( - endpoint.clone(), - &bucket_name, - if *use_path_style { + endpoint, + bucket_name, + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), + region, + access_key, + secret_key, session_token, repo.clone(), ) .await?; match repo { - Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await, } } } @@ -1271,10 +1275,8 @@ where S2: Store, { let stream = from.to_stream(identifier, None, None).await?; - futures_util::pin_mut!(stream); - let mut reader = tokio_util::io::StreamReader::new(stream); - let new_identifier = to.save_async_read(&mut reader).await?; + let new_identifier = to.save_stream(stream).await?; Ok(new_identifier) } diff --git a/src/store.rs b/src/store.rs index bfa4c66..def70ce 100644 --- a/src/store.rs +++ b/src/store.rs @@ -15,17 +15,24 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { Self: Sized; } +pub(crate) trait StoreConfig: Send + Sync + Clone { + type Store: Store; + + fn build(self) -> Self::Store; +} + #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { - type Config: Send + Sync + Clone; type Identifier: Identifier + 'static; - type Stream: Stream> + 'static; + type Stream: Stream> + Unpin + 'static; - fn init(config: Self::Config) -> Self; - - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin; + Reader: AsyncRead + Unpin + 'static; + + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static; async fn save_bytes(&self, bytes: Bytes) -> Result; @@ -42,7 +49,7 @@ pub(crate) trait Store: Clone + Debug { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin; + Writer: AsyncWrite + Unpin; async fn len(&self, identifier: &Self::Identifier) -> Result; @@ -57,13 +64,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -83,7 +97,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } @@ -105,13 +119,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -131,7 +152,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 6d11ff6..fde1b81 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -2,7 +2,7 @@ use crate::{ error::Error, file::File, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; use actix_web::web::Bytes; use futures_util::stream::Stream; @@ -12,6 +12,7 @@ use std::{ }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::io::StreamReader; use tracing::{debug, error, instrument, Instrument}; mod file_id; @@ -47,24 +48,27 @@ pub(crate) struct FileStore { repo: Repo, } +impl StoreConfig for FileStore { + type Store = FileStore; + + fn build(self) -> Self::Store { + self + } +} + #[async_trait::async_trait(?Send)] impl Store for FileStore { - type Config = Self; type Identifier = FileId; type Stream = Pin>>>; - fn init(config: Self::Config) -> Self { - config - } - #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, mut reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { let path = self.next_file().await?; - if let Err(e) = self.safe_save_reader(&path, reader).await { + if let Err(e) = self.safe_save_reader(&path, &mut reader).await { self.safe_remove_file(&path).await?; return Err(e.into()); } @@ -72,6 +76,13 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + self.save_async_read(StreamReader::new(stream)).await + } + #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; @@ -114,7 +125,7 @@ impl Store for FileStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { let path = self.path_from_file_id(identifier); @@ -260,30 +271,6 @@ impl FileStore { Ok(()) } - - // try moving a file - #[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))] - pub(crate) async fn safe_move_file, Q: AsRef>( - &self, - from: P, - to: Q, - ) -> Result<(), FileError> { - safe_create_parent(&to).await?; - - debug!("Checking if {:?} already exists", to.as_ref()); - if let Err(e) = tokio::fs::metadata(&to).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Err(FileError::FileExists); - } - - debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref()); - tokio::fs::copy(&from, &to).await?; - self.safe_remove_file(from).await?; - Ok(()) - } } pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a7e7888..a124808 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,18 +1,19 @@ use crate::{ error::Error, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; use actix_web::{ + error::PayloadError, http::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, }, web::Bytes, }; -use awc::{Client, ClientRequest}; -use futures_util::{Stream, TryStreamExt}; -use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle}; +use awc::{error::SendRequestError, Client, ClientRequest}; +use futures_util::{Stream, StreamExt, TryStreamExt}; +use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; @@ -32,6 +33,12 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), + #[error("Failed to generate request")] + S3(#[from] BucketError), + + #[error("Error making request")] + SendRequest(String), + #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -42,6 +49,12 @@ pub(crate) enum ObjectError { Status(StatusCode), } +impl From for ObjectError { + fn from(e: SendRequestError) -> Self { + Self::SendRequest(e.to_string()) + } +} + #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, @@ -59,30 +72,48 @@ pub(crate) struct ObjectStoreConfig { credentials: Credentials, } -#[async_trait::async_trait(?Send)] -impl Store for ObjectStore { - type Config = ObjectStoreConfig; - type Identifier = ObjectId; - type Stream = Pin>>>; +impl StoreConfig for ObjectStoreConfig { + type Store = ObjectStore; - fn init(config: Self::Config) -> Self { + fn build(self) -> Self::Store { ObjectStore { - path_gen: config.path_gen, - repo: config.repo, - bucket: config.bucket, - credentials: config.credentials, + path_gen: self.path_gen, + repo: self.repo, + bucket: self.bucket, + credentials: self.credentials, client: crate::build_client(), } } +} + +fn payload_to_io_error(e: PayloadError) -> std::io::Error { + match e { + PayloadError::Io(io) => io, + otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()), + } +} + +#[async_trait::async_trait(?Send)] +impl Store for ObjectStore { + type Identifier = ObjectId; + type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, + { + self.save_stream(ReaderStream::new(reader)).await + } + + #[tracing::instrument(skip(stream))] + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, { let (req, object_id) = self.put_object_request().await?; - let response = req.send_stream(ReaderStream::new(reader)).await?; + let response = req.send_stream(stream).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); @@ -95,7 +126,7 @@ impl Store for ObjectStore { async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; - let response = req.send_body(bytes).await?; + let response = req.send_body(bytes).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); @@ -114,10 +145,11 @@ impl Store for ObjectStore { let response = self .get_object_request(identifier, from_start, len) .send() - .await?; + .await + .map_err(ObjectError::from)?; if response.status().is_success() { - return Ok(Box::pin(response)); + return Ok(Box::pin(response.map_err(payload_to_io_error))); } Err(ObjectError::Status(response.status()).into()) @@ -130,20 +162,24 @@ impl Store for ObjectStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { - let response = self + let mut response = self .get_object_request(identifier, None, None) .send() - .await?; + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + ObjectError::Status(response.status()), + )); } while let Some(res) = response.next().await { - let bytes = res?; - writer.write_all_buf(bytes).await?; + let mut bytes = res.map_err(payload_to_io_error)?; + writer.write_all_buf(&mut bytes).await?; } writer.flush().await?; @@ -152,7 +188,11 @@ impl Store for ObjectStore { #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let response = self.head_object_request(identifier).send().await?; + let response = self + .head_object_request(identifier) + .send() + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); @@ -163,7 +203,7 @@ impl Store for ObjectStore { .get(CONTENT_LENGTH) .ok_or(ObjectError::Length)? .to_str() - .ok_or(ObjectError::Length) + .map_err(|_| ObjectError::Length)? .parse::() .map_err(|_| ObjectError::Length)?; @@ -186,11 +226,11 @@ impl ObjectStore { #[allow(clippy::too_many_arguments)] pub(crate) async fn build( endpoint: Url, - bucket_name: &str, + bucket_name: String, url_style: UrlStyle, - region: &str, - access_key: Option, - secret_key: Option, + region: String, + access_key: String, + secret_key: String, session_token: Option, repo: Repo, ) -> Result { @@ -201,7 +241,11 @@ impl ObjectStore { repo, bucket: Bucket::new(endpoint, url_style, bucket_name, region) .map_err(ObjectError::from)?, - credentials: Credentials::new_with_token(access_key, secret_key, session_token), + credentials: if let Some(token) = session_token { + Credentials::new_with_token(access_key, secret_key, token) + } else { + Credentials::new(access_key, secret_key) + }, }) } @@ -213,7 +257,7 @@ impl ObjectStore { Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + fn build_request<'a, A: S3Action<'a>>(&'a self, mut action: A) -> ClientRequest { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -224,11 +268,11 @@ impl ObjectStore { let url = action.sign(Duration::from_secs(5)); - let req = self.client.request(method, url); + let req = self.client.request(method, url.as_str()); action .headers_mut() - .drain() + .iter() .fold(req, |req, tup| req.insert_header(tup)) } @@ -247,17 +291,11 @@ impl ObjectStore { let start = from_start.unwrap_or(0); let end = len.map(|len| start + len - 1); - let range = match (start, end) { - (Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)), - (Some(start), None) => Some(ByteRangeSpec::From(start)), - _ => None, - }; - - if let Some(range) = range { - req.insert_header(Range::Bytes(vec![range])) + req.insert_header(Range::Bytes(vec![if let Some(end) = end { + ByteRangeSpec::FromTo(start, end) } else { - req - } + ByteRangeSpec::From(start) + }])) } fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {