mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-11-10 06:25:00 +00:00
Make it compile
This commit is contained in:
parent
ff1771e016
commit
25209e29c0
10 changed files with 216 additions and 168 deletions
7
Cargo.lock
generated
7
Cargo.lock
generated
|
@ -22,7 +22,8 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "actix-form-data"
|
name = "actix-form-data"
|
||||||
version = "0.7.0-beta.0"
|
version = "0.7.0-beta.0"
|
||||||
source = "git+https://git.asonix.dog/asonix/actix-form-data?branch=v0.7.x#3525bcd09cd030df3f2ed7684f2aad1bcc42d68b"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e721f3919cb43c566c0dbb6a9cb5ad5106ac42b6b3c0d21a7a3e762455de957a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-multipart",
|
"actix-multipart",
|
||||||
"actix-rt",
|
"actix-rt",
|
||||||
|
@ -1672,9 +1673,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.43"
|
version = "1.0.44"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
|
checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
|
@ -19,7 +19,7 @@ io-uring = [
|
||||||
]
|
]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-form-data = { version = "0.7.0-beta.0", git = "https://git.asonix.dog/asonix/actix-form-data", branch = "v0.7.x" }
|
actix-form-data = "0.7.0-beta.0"
|
||||||
actix-rt = { version = "2.7.0", default-features = false }
|
actix-rt = { version = "2.7.0", default-features = false }
|
||||||
actix-server = "2.0.0"
|
actix-server = "2.0.0"
|
||||||
actix-web = { version = "4.0.0", default-features = false }
|
actix-web = { version = "4.0.0", default-features = false }
|
||||||
|
|
|
@ -5,7 +5,6 @@ use crate::{
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use futures_util::{Stream, TryStreamExt};
|
use futures_util::{Stream, TryStreamExt};
|
||||||
use tokio_util::io::StreamReader;
|
|
||||||
use tracing::{Instrument, Span};
|
use tracing::{Instrument, Span};
|
||||||
|
|
||||||
pub(crate) struct Backgrounded<R, S>
|
pub(crate) struct Backgrounded<R, S>
|
||||||
|
@ -38,7 +37,7 @@ where
|
||||||
|
|
||||||
pub(crate) async fn proxy<P>(repo: R, store: S, stream: P) -> Result<Self, Error>
|
pub(crate) async fn proxy<P>(repo: R, store: S, stream: P) -> Result<Self, Error>
|
||||||
where
|
where
|
||||||
P: Stream<Item = Result<Bytes, Error>>,
|
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
let mut this = Self {
|
let mut this = Self {
|
||||||
repo,
|
repo,
|
||||||
|
@ -53,14 +52,13 @@ where
|
||||||
|
|
||||||
async fn do_proxy<P>(&mut self, store: S, stream: P) -> Result<(), Error>
|
async fn do_proxy<P>(&mut self, store: S, stream: P) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
P: Stream<Item = Result<Bytes, Error>>,
|
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?;
|
UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?;
|
||||||
|
|
||||||
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
||||||
let mut reader = StreamReader::new(Box::pin(stream));
|
|
||||||
|
|
||||||
let identifier = store.save_async_read(&mut reader).await?;
|
let identifier = store.save_stream(stream).await?;
|
||||||
|
|
||||||
self.identifier = Some(identifier);
|
self.identifier = Some(identifier);
|
||||||
|
|
||||||
|
|
|
@ -57,14 +57,14 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
identifier
|
identifier
|
||||||
} else {
|
} else {
|
||||||
let identifier = repo.identifier(hash.clone()).await?;
|
let identifier = repo.identifier(hash.clone()).await?;
|
||||||
let mut reader = crate::ffmpeg::thumbnail(
|
let reader = crate::ffmpeg::thumbnail(
|
||||||
store.clone(),
|
store.clone(),
|
||||||
identifier,
|
identifier,
|
||||||
InputFormat::Mp4,
|
InputFormat::Mp4,
|
||||||
ThumbnailFormat::Jpeg,
|
ThumbnailFormat::Jpeg,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let motion_identifier = store.save_async_read(&mut reader).await?;
|
let motion_identifier = store.save_async_read(reader).await?;
|
||||||
|
|
||||||
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
|
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -55,7 +55,7 @@ where
|
||||||
pub(crate) async fn ingest<R, S>(
|
pub(crate) async fn ingest<R, S>(
|
||||||
repo: &R,
|
repo: &R,
|
||||||
store: &S,
|
store: &S,
|
||||||
stream: impl Stream<Item = Result<Bytes, Error>>,
|
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
|
||||||
declared_alias: Option<Alias>,
|
declared_alias: Option<Alias>,
|
||||||
should_validate: bool,
|
should_validate: bool,
|
||||||
is_cached: bool,
|
is_cached: bool,
|
||||||
|
@ -77,9 +77,10 @@ where
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut hasher_reader = Hasher::new(validated_reader, Sha256::new());
|
let hasher_reader = Hasher::new(validated_reader, Sha256::new());
|
||||||
|
let hasher = hasher_reader.hasher();
|
||||||
|
|
||||||
let identifier = store.save_async_read(&mut hasher_reader).await?;
|
let identifier = store.save_async_read(hasher_reader).await?;
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
|
@ -90,7 +91,7 @@ where
|
||||||
identifier: Some(identifier.clone()),
|
identifier: Some(identifier.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let hash = hasher_reader.finalize_reset().await?;
|
let hash = hasher.borrow_mut().finalize_reset().to_vec();
|
||||||
|
|
||||||
session.hash = Some(hash.clone());
|
session.hash = Some(hash.clone());
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
use crate::error::Error;
|
|
||||||
use actix_web::web;
|
|
||||||
use sha2::{digest::FixedOutputReset, Digest};
|
use sha2::{digest::FixedOutputReset, Digest};
|
||||||
use std::{
|
use std::{
|
||||||
|
cell::RefCell,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
rc::Rc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::io::{AsyncRead, ReadBuf};
|
use tokio::io::{AsyncRead, ReadBuf};
|
||||||
|
@ -12,7 +12,7 @@ pin_project_lite::pin_project! {
|
||||||
#[pin]
|
#[pin]
|
||||||
inner: I,
|
inner: I,
|
||||||
|
|
||||||
hasher: D,
|
hasher: Rc<RefCell<D>>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,14 +23,12 @@ where
|
||||||
pub(super) fn new(reader: I, digest: D) -> Self {
|
pub(super) fn new(reader: I, digest: D) -> Self {
|
||||||
Hasher {
|
Hasher {
|
||||||
inner: reader,
|
inner: reader,
|
||||||
hasher: digest,
|
hasher: Rc::new(RefCell::new(digest)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn finalize_reset(self) -> Result<Vec<u8>, Error> {
|
pub(super) fn hasher(&self) -> Rc<RefCell<D>> {
|
||||||
let mut hasher = self.hasher;
|
Rc::clone(&self.hasher)
|
||||||
let hash = web::block(move || hasher.finalize_reset().to_vec()).await?;
|
|
||||||
Ok(hash)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +51,9 @@ where
|
||||||
let poll_res = reader.poll_read(cx, buf);
|
let poll_res = reader.poll_read(cx, buf);
|
||||||
let after_len = buf.filled().len();
|
let after_len = buf.filled().len();
|
||||||
if after_len > before_len {
|
if after_len > before_len {
|
||||||
hasher.update(&buf.filled()[before_len..after_len]);
|
hasher
|
||||||
|
.borrow_mut()
|
||||||
|
.update(&buf.filled()[before_len..after_len]);
|
||||||
}
|
}
|
||||||
poll_res
|
poll_res
|
||||||
}
|
}
|
||||||
|
|
104
src/main.rs
104
src/main.rs
|
@ -66,7 +66,11 @@ use self::{
|
||||||
UploadResult,
|
UploadResult,
|
||||||
},
|
},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
|
store::{
|
||||||
|
file_store::FileStore,
|
||||||
|
object_store::{ObjectStore, ObjectStoreConfig},
|
||||||
|
Identifier, Store, StoreConfig,
|
||||||
|
},
|
||||||
stream::{StreamLimit, StreamTimeout},
|
stream::{StreamLimit, StreamTimeout},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -449,7 +453,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
|
|
||||||
#[instrument(name = "Downloading file inline", skip(stream))]
|
#[instrument(name = "Downloading file inline", skip(stream))]
|
||||||
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
|
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
stream: impl Stream<Item = Result<web::Bytes, Error>>,
|
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
|
||||||
repo: web::Data<R>,
|
repo: web::Data<R>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
is_cached: bool,
|
is_cached: bool,
|
||||||
|
@ -475,7 +479,7 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
|
|
||||||
#[instrument(name = "Downloading file in background", skip(stream))]
|
#[instrument(name = "Downloading file in background", skip(stream))]
|
||||||
async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
|
async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
stream: impl Stream<Item = Result<web::Bytes, Error>>,
|
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
|
||||||
repo: web::Data<R>,
|
repo: web::Data<R>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
is_cached: bool,
|
is_cached: bool,
|
||||||
|
@ -971,15 +975,15 @@ fn next_worker_id() -> String {
|
||||||
format!("{}-{}", CONFIG.server.worker_id, next_id)
|
format!("{}-{}", CONFIG.server.worker_id, next_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn launch<R: FullRepo + 'static, S: Store + 'static>(
|
async fn launch<R: FullRepo + 'static, SC: StoreConfig + 'static>(
|
||||||
repo: R,
|
repo: R,
|
||||||
store: S::Config,
|
store_config: SC,
|
||||||
) -> color_eyre::Result<()> {
|
) -> color_eyre::Result<()> {
|
||||||
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
|
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
let store = S::init(store.clone());
|
let store = store_config.clone().build();
|
||||||
let repo = repo.clone();
|
let repo = repo.clone();
|
||||||
|
|
||||||
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
||||||
|
@ -1008,20 +1012,23 @@ async fn launch<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
.service(
|
.service(
|
||||||
web::resource("")
|
web::resource("")
|
||||||
.guard(guard::Post())
|
.guard(guard::Post())
|
||||||
.route(web::post().to(upload::<R, S>)),
|
.route(web::post().to(upload::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::scope("/backgrounded")
|
web::scope("/backgrounded")
|
||||||
.service(
|
.service(
|
||||||
web::resource("")
|
web::resource("")
|
||||||
.guard(guard::Post())
|
.guard(guard::Post())
|
||||||
.route(web::post().to(upload_backgrounded::<R, S>)),
|
.route(web::post().to(upload_backgrounded::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/claim").route(web::get().to(claim_upload::<R, S>)),
|
web::resource("/claim")
|
||||||
|
.route(web::get().to(claim_upload::<R, SC::Store>)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
|
.service(
|
||||||
|
web::resource("/download").route(web::get().to(download::<R, SC::Store>)),
|
||||||
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/delete/{delete_token}/{filename}")
|
web::resource("/delete/{delete_token}/{filename}")
|
||||||
.route(web::delete().to(delete::<R>))
|
.route(web::delete().to(delete::<R>))
|
||||||
|
@ -1029,27 +1036,27 @@ async fn launch<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/original/{filename}")
|
web::resource("/original/{filename}")
|
||||||
.route(web::get().to(serve::<R, S>))
|
.route(web::get().to(serve::<R, SC::Store>))
|
||||||
.route(web::head().to(serve_head::<R, S>)),
|
.route(web::head().to(serve_head::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/process.{ext}")
|
web::resource("/process.{ext}")
|
||||||
.route(web::get().to(process::<R, S>))
|
.route(web::get().to(process::<R, SC::Store>))
|
||||||
.route(web::head().to(process_head::<R, S>)),
|
.route(web::head().to(process_head::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/process_backgrounded.{ext}")
|
web::resource("/process_backgrounded.{ext}")
|
||||||
.route(web::get().to(process_backgrounded::<R, S>)),
|
.route(web::get().to(process_backgrounded::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::scope("/details")
|
web::scope("/details")
|
||||||
.service(
|
.service(
|
||||||
web::resource("/original/{filename}")
|
web::resource("/original/{filename}")
|
||||||
.route(web::get().to(details::<R, S>)),
|
.route(web::get().to(details::<R, SC::Store>)),
|
||||||
)
|
)
|
||||||
.service(
|
.service(
|
||||||
web::resource("/process.{ext}")
|
web::resource("/process.{ext}")
|
||||||
.route(web::get().to(process_details::<R, S>)),
|
.route(web::get().to(process_details::<R, SC::Store>)),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -1058,7 +1065,7 @@ async fn launch<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
.wrap(Internal(
|
.wrap(Internal(
|
||||||
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()),
|
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()),
|
||||||
))
|
))
|
||||||
.service(web::resource("/import").route(web::post().to(import::<R, S>)))
|
.service(web::resource("/import").route(web::post().to(import::<R, SC::Store>)))
|
||||||
.service(
|
.service(
|
||||||
web::resource("/variants").route(web::delete().to(clean_variants::<R>)),
|
web::resource("/variants").route(web::delete().to(clean_variants::<R>)),
|
||||||
)
|
)
|
||||||
|
@ -1075,14 +1082,13 @@ async fn launch<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()>
|
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()>
|
||||||
where
|
where
|
||||||
S1: Store,
|
S1: Store,
|
||||||
{
|
{
|
||||||
match to {
|
match to {
|
||||||
config::Store::Filesystem(config::Filesystem { path }) => {
|
config::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let to = FileStore::build(path.clone(), repo.clone()).await?;
|
let to = FileStore::build(path.clone(), repo.clone()).await?.build();
|
||||||
let to = FileStore::init(to);
|
|
||||||
|
|
||||||
match repo {
|
match repo {
|
||||||
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
|
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
|
||||||
|
@ -1100,20 +1106,19 @@ where
|
||||||
let to = ObjectStore::build(
|
let to = ObjectStore::build(
|
||||||
endpoint.clone(),
|
endpoint.clone(),
|
||||||
bucket_name,
|
bucket_name,
|
||||||
if *use_path_style {
|
if use_path_style {
|
||||||
UrlStyle::Path
|
UrlStyle::Path
|
||||||
} else {
|
} else {
|
||||||
UrlStyle::VirtualHost
|
UrlStyle::VirtualHost
|
||||||
},
|
},
|
||||||
region.as_ref(),
|
region,
|
||||||
Some(access_key.clone()),
|
access_key,
|
||||||
Some(secret_key.clone()),
|
secret_key,
|
||||||
session_token.clone(),
|
session_token,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
|
.build();
|
||||||
let to = ObjectStore::init(to);
|
|
||||||
|
|
||||||
match repo {
|
match repo {
|
||||||
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
|
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
|
||||||
|
@ -1136,9 +1141,8 @@ async fn main() -> color_eyre::Result<()> {
|
||||||
Operation::MigrateStore { from, to } => {
|
Operation::MigrateStore { from, to } => {
|
||||||
match from {
|
match from {
|
||||||
config::Store::Filesystem(config::Filesystem { path }) => {
|
config::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
let from = FileStore::build(path.clone(), repo.clone()).await?.build();
|
||||||
let from = FileStore::init(from);
|
migrate_inner(&repo, from, to).await?;
|
||||||
migrate_inner(&repo, from, &to).await?;
|
|
||||||
}
|
}
|
||||||
config::Store::ObjectStorage(config::ObjectStorage {
|
config::Store::ObjectStorage(config::ObjectStorage {
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -1150,23 +1154,23 @@ async fn main() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
}) => {
|
}) => {
|
||||||
let from = ObjectStore::build(
|
let from = ObjectStore::build(
|
||||||
endpoint.clone(),
|
endpoint,
|
||||||
&bucket_name,
|
bucket_name,
|
||||||
if *use_path_style {
|
if use_path_style {
|
||||||
UrlStyle::Path
|
UrlStyle::Path
|
||||||
} else {
|
} else {
|
||||||
UrlStyle::VirtualHost
|
UrlStyle::VirtualHost
|
||||||
},
|
},
|
||||||
Serde::into_inner(region),
|
region,
|
||||||
Some(access_key),
|
access_key,
|
||||||
Some(secret_key),
|
secret_key,
|
||||||
session_token,
|
session_token,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
let from = ObjectStore::init(from);
|
.build();
|
||||||
|
|
||||||
migrate_inner(&repo, from, &to).await?;
|
migrate_inner(&repo, from, to).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1193,23 +1197,23 @@ async fn main() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
}) => {
|
}) => {
|
||||||
let store = ObjectStore::build(
|
let store = ObjectStore::build(
|
||||||
endpoint.clone(),
|
endpoint,
|
||||||
&bucket_name,
|
bucket_name,
|
||||||
if *use_path_style {
|
if use_path_style {
|
||||||
UrlStyle::Path
|
UrlStyle::Path
|
||||||
} else {
|
} else {
|
||||||
UrlStyle::VirtualHost
|
UrlStyle::VirtualHost
|
||||||
},
|
},
|
||||||
Serde::into_inner(region),
|
region,
|
||||||
Some(access_key),
|
access_key,
|
||||||
Some(secret_key),
|
secret_key,
|
||||||
session_token,
|
session_token,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match repo {
|
match repo {
|
||||||
Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await,
|
Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1271,10 +1275,8 @@ where
|
||||||
S2: Store,
|
S2: Store,
|
||||||
{
|
{
|
||||||
let stream = from.to_stream(identifier, None, None).await?;
|
let stream = from.to_stream(identifier, None, None).await?;
|
||||||
futures_util::pin_mut!(stream);
|
|
||||||
let mut reader = tokio_util::io::StreamReader::new(stream);
|
|
||||||
|
|
||||||
let new_identifier = to.save_async_read(&mut reader).await?;
|
let new_identifier = to.save_stream(stream).await?;
|
||||||
|
|
||||||
Ok(new_identifier)
|
Ok(new_identifier)
|
||||||
}
|
}
|
||||||
|
|
47
src/store.rs
47
src/store.rs
|
@ -15,17 +15,24 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
|
||||||
Self: Sized;
|
Self: Sized;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) trait StoreConfig: Send + Sync + Clone {
|
||||||
|
type Store: Store;
|
||||||
|
|
||||||
|
fn build(self) -> Self::Store;
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
pub(crate) trait Store: Clone + Debug {
|
pub(crate) trait Store: Clone + Debug {
|
||||||
type Config: Send + Sync + Clone;
|
|
||||||
type Identifier: Identifier + 'static;
|
type Identifier: Identifier + 'static;
|
||||||
type Stream: Stream<Item = std::io::Result<Bytes>> + 'static;
|
type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
||||||
|
|
||||||
fn init(config: Self::Config) -> Self;
|
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
|
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin;
|
Reader: AsyncRead + Unpin + 'static;
|
||||||
|
|
||||||
|
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error>;
|
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error>;
|
||||||
|
|
||||||
|
@ -42,7 +49,7 @@ pub(crate) trait Store: Clone + Debug {
|
||||||
writer: &mut Writer,
|
writer: &mut Writer,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Writer: AsyncWrite + Send + Unpin;
|
Writer: AsyncWrite + Unpin;
|
||||||
|
|
||||||
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error>;
|
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error>;
|
||||||
|
|
||||||
|
@ -57,13 +64,20 @@ where
|
||||||
type Identifier = T::Identifier;
|
type Identifier = T::Identifier;
|
||||||
type Stream = T::Stream;
|
type Stream = T::Stream;
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
|
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader).await
|
T::save_async_read(self, reader).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
|
{
|
||||||
|
T::save_stream(self, stream).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
||||||
T::save_bytes(self, bytes).await
|
T::save_bytes(self, bytes).await
|
||||||
}
|
}
|
||||||
|
@ -83,7 +97,7 @@ where
|
||||||
writer: &mut Writer,
|
writer: &mut Writer,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Writer: AsyncWrite + Send + Unpin,
|
Writer: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
T::read_into(self, identifier, writer).await
|
T::read_into(self, identifier, writer).await
|
||||||
}
|
}
|
||||||
|
@ -105,13 +119,20 @@ where
|
||||||
type Identifier = T::Identifier;
|
type Identifier = T::Identifier;
|
||||||
type Stream = T::Stream;
|
type Stream = T::Stream;
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
|
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader).await
|
T::save_async_read(self, reader).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
|
{
|
||||||
|
T::save_stream(self, stream).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
||||||
T::save_bytes(self, bytes).await
|
T::save_bytes(self, bytes).await
|
||||||
}
|
}
|
||||||
|
@ -131,7 +152,7 @@ where
|
||||||
writer: &mut Writer,
|
writer: &mut Writer,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Writer: AsyncWrite + Send + Unpin,
|
Writer: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
T::read_into(self, identifier, writer).await
|
T::read_into(self, identifier, writer).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
file::File,
|
file::File,
|
||||||
repo::{Repo, SettingsRepo},
|
repo::{Repo, SettingsRepo},
|
||||||
store::Store,
|
store::{Store, StoreConfig},
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use futures_util::stream::Stream;
|
use futures_util::stream::Stream;
|
||||||
|
@ -12,6 +12,7 @@ use std::{
|
||||||
};
|
};
|
||||||
use storage_path_generator::Generator;
|
use storage_path_generator::Generator;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_util::io::StreamReader;
|
||||||
use tracing::{debug, error, instrument, Instrument};
|
use tracing::{debug, error, instrument, Instrument};
|
||||||
|
|
||||||
mod file_id;
|
mod file_id;
|
||||||
|
@ -47,24 +48,27 @@ pub(crate) struct FileStore {
|
||||||
repo: Repo,
|
repo: Repo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl StoreConfig for FileStore {
|
||||||
|
type Store = FileStore;
|
||||||
|
|
||||||
|
fn build(self) -> Self::Store {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl Store for FileStore {
|
impl Store for FileStore {
|
||||||
type Config = Self;
|
|
||||||
type Identifier = FileId;
|
type Identifier = FileId;
|
||||||
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
|
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
|
||||||
|
|
||||||
fn init(config: Self::Config) -> Self {
|
|
||||||
config
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(reader))]
|
#[tracing::instrument(skip(reader))]
|
||||||
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
|
async fn save_async_read<Reader>(&self, mut reader: Reader) -> Result<Self::Identifier, Error>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
|
||||||
if let Err(e) = self.safe_save_reader(&path, reader).await {
|
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
|
||||||
self.safe_remove_file(&path).await?;
|
self.safe_remove_file(&path).await?;
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
|
@ -72,6 +76,13 @@ impl Store for FileStore {
|
||||||
Ok(self.file_id_from_path(path)?)
|
Ok(self.file_id_from_path(path)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
|
{
|
||||||
|
self.save_async_read(StreamReader::new(stream)).await
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(bytes))]
|
#[tracing::instrument(skip(bytes))]
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
@ -114,7 +125,7 @@ impl Store for FileStore {
|
||||||
writer: &mut Writer,
|
writer: &mut Writer,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Writer: AsyncWrite + Send + Unpin,
|
Writer: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
let path = self.path_from_file_id(identifier);
|
let path = self.path_from_file_id(identifier);
|
||||||
|
|
||||||
|
@ -260,30 +271,6 @@ impl FileStore {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// try moving a file
|
|
||||||
#[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))]
|
|
||||||
pub(crate) async fn safe_move_file<P: AsRef<Path>, Q: AsRef<Path>>(
|
|
||||||
&self,
|
|
||||||
from: P,
|
|
||||||
to: Q,
|
|
||||||
) -> Result<(), FileError> {
|
|
||||||
safe_create_parent(&to).await?;
|
|
||||||
|
|
||||||
debug!("Checking if {:?} already exists", to.as_ref());
|
|
||||||
if let Err(e) = tokio::fs::metadata(&to).await {
|
|
||||||
if e.kind() != std::io::ErrorKind::NotFound {
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(FileError::FileExists);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref());
|
|
||||||
tokio::fs::copy(&from, &to).await?;
|
|
||||||
self.safe_remove_file(from).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {
|
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {
|
||||||
|
|
|
@ -1,18 +1,19 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
repo::{Repo, SettingsRepo},
|
repo::{Repo, SettingsRepo},
|
||||||
store::Store,
|
store::{Store, StoreConfig},
|
||||||
};
|
};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
|
error::PayloadError,
|
||||||
http::{
|
http::{
|
||||||
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
|
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
|
||||||
StatusCode,
|
StatusCode,
|
||||||
},
|
},
|
||||||
web::Bytes,
|
web::Bytes,
|
||||||
};
|
};
|
||||||
use awc::{Client, ClientRequest};
|
use awc::{error::SendRequestError, Client, ClientRequest};
|
||||||
use futures_util::{Stream, TryStreamExt};
|
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||||
use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle};
|
use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle};
|
||||||
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
|
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
|
||||||
use storage_path_generator::{Generator, Path};
|
use storage_path_generator::{Generator, Path};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||||
|
@ -32,6 +33,12 @@ pub(crate) enum ObjectError {
|
||||||
#[error("Failed to generate path")]
|
#[error("Failed to generate path")]
|
||||||
PathGenerator(#[from] storage_path_generator::PathError),
|
PathGenerator(#[from] storage_path_generator::PathError),
|
||||||
|
|
||||||
|
#[error("Failed to generate request")]
|
||||||
|
S3(#[from] BucketError),
|
||||||
|
|
||||||
|
#[error("Error making request")]
|
||||||
|
SendRequest(String),
|
||||||
|
|
||||||
#[error("Failed to parse string")]
|
#[error("Failed to parse string")]
|
||||||
Utf8(#[from] FromUtf8Error),
|
Utf8(#[from] FromUtf8Error),
|
||||||
|
|
||||||
|
@ -42,6 +49,12 @@ pub(crate) enum ObjectError {
|
||||||
Status(StatusCode),
|
Status(StatusCode),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<SendRequestError> for ObjectError {
|
||||||
|
fn from(e: SendRequestError) -> Self {
|
||||||
|
Self::SendRequest(e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct ObjectStore {
|
pub(crate) struct ObjectStore {
|
||||||
path_gen: Generator,
|
path_gen: Generator,
|
||||||
|
@ -59,30 +72,48 @@ pub(crate) struct ObjectStoreConfig {
|
||||||
credentials: Credentials,
|
credentials: Credentials,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
impl StoreConfig for ObjectStoreConfig {
|
||||||
impl Store for ObjectStore {
|
type Store = ObjectStore;
|
||||||
type Config = ObjectStoreConfig;
|
|
||||||
type Identifier = ObjectId;
|
|
||||||
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
|
|
||||||
|
|
||||||
fn init(config: Self::Config) -> Self {
|
fn build(self) -> Self::Store {
|
||||||
ObjectStore {
|
ObjectStore {
|
||||||
path_gen: config.path_gen,
|
path_gen: self.path_gen,
|
||||||
repo: config.repo,
|
repo: self.repo,
|
||||||
bucket: config.bucket,
|
bucket: self.bucket,
|
||||||
credentials: config.credentials,
|
credentials: self.credentials,
|
||||||
client: crate::build_client(),
|
client: crate::build_client(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn payload_to_io_error(e: PayloadError) -> std::io::Error {
|
||||||
|
match e {
|
||||||
|
PayloadError::Io(io) => io,
|
||||||
|
otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait(?Send)]
|
||||||
|
impl Store for ObjectStore {
|
||||||
|
type Identifier = ObjectId;
|
||||||
|
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
|
||||||
|
|
||||||
#[tracing::instrument(skip(reader))]
|
#[tracing::instrument(skip(reader))]
|
||||||
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
|
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
|
{
|
||||||
|
self.save_stream(ReaderStream::new(reader)).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(stream))]
|
||||||
|
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
let (req, object_id) = self.put_object_request().await?;
|
let (req, object_id) = self.put_object_request().await?;
|
||||||
|
|
||||||
let response = req.send_stream(ReaderStream::new(reader)).await?;
|
let response = req.send_stream(stream).await.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
if response.status().is_success() {
|
if response.status().is_success() {
|
||||||
return Ok(object_id);
|
return Ok(object_id);
|
||||||
|
@ -95,7 +126,7 @@ impl Store for ObjectStore {
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
|
||||||
let (req, object_id) = self.put_object_request().await?;
|
let (req, object_id) = self.put_object_request().await?;
|
||||||
|
|
||||||
let response = req.send_body(bytes).await?;
|
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
if response.status().is_success() {
|
if response.status().is_success() {
|
||||||
return Ok(object_id);
|
return Ok(object_id);
|
||||||
|
@ -114,10 +145,11 @@ impl Store for ObjectStore {
|
||||||
let response = self
|
let response = self
|
||||||
.get_object_request(identifier, from_start, len)
|
.get_object_request(identifier, from_start, len)
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await
|
||||||
|
.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
if response.status().is_success() {
|
if response.status().is_success() {
|
||||||
return Ok(Box::pin(response));
|
return Ok(Box::pin(response.map_err(payload_to_io_error)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(ObjectError::Status(response.status()).into())
|
Err(ObjectError::Status(response.status()).into())
|
||||||
|
@ -130,20 +162,24 @@ impl Store for ObjectStore {
|
||||||
writer: &mut Writer,
|
writer: &mut Writer,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Writer: AsyncWrite + Send + Unpin,
|
Writer: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
let response = self
|
let mut response = self
|
||||||
.get_object_request(identifier, None, None)
|
.get_object_request(identifier, None, None)
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?;
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
return Err(ObjectError::Status(response.status()).into());
|
return Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
ObjectError::Status(response.status()),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(res) = response.next().await {
|
while let Some(res) = response.next().await {
|
||||||
let bytes = res?;
|
let mut bytes = res.map_err(payload_to_io_error)?;
|
||||||
writer.write_all_buf(bytes).await?;
|
writer.write_all_buf(&mut bytes).await?;
|
||||||
}
|
}
|
||||||
writer.flush().await?;
|
writer.flush().await?;
|
||||||
|
|
||||||
|
@ -152,7 +188,11 @@ impl Store for ObjectStore {
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument]
|
||||||
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
|
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
|
||||||
let response = self.head_object_request(identifier).send().await?;
|
let response = self
|
||||||
|
.head_object_request(identifier)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
return Err(ObjectError::Status(response.status()).into());
|
return Err(ObjectError::Status(response.status()).into());
|
||||||
|
@ -163,7 +203,7 @@ impl Store for ObjectStore {
|
||||||
.get(CONTENT_LENGTH)
|
.get(CONTENT_LENGTH)
|
||||||
.ok_or(ObjectError::Length)?
|
.ok_or(ObjectError::Length)?
|
||||||
.to_str()
|
.to_str()
|
||||||
.ok_or(ObjectError::Length)
|
.map_err(|_| ObjectError::Length)?
|
||||||
.parse::<u64>()
|
.parse::<u64>()
|
||||||
.map_err(|_| ObjectError::Length)?;
|
.map_err(|_| ObjectError::Length)?;
|
||||||
|
|
||||||
|
@ -186,11 +226,11 @@ impl ObjectStore {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(crate) async fn build(
|
pub(crate) async fn build(
|
||||||
endpoint: Url,
|
endpoint: Url,
|
||||||
bucket_name: &str,
|
bucket_name: String,
|
||||||
url_style: UrlStyle,
|
url_style: UrlStyle,
|
||||||
region: &str,
|
region: String,
|
||||||
access_key: Option<String>,
|
access_key: String,
|
||||||
secret_key: Option<String>,
|
secret_key: String,
|
||||||
session_token: Option<String>,
|
session_token: Option<String>,
|
||||||
repo: Repo,
|
repo: Repo,
|
||||||
) -> Result<ObjectStoreConfig, Error> {
|
) -> Result<ObjectStoreConfig, Error> {
|
||||||
|
@ -201,7 +241,11 @@ impl ObjectStore {
|
||||||
repo,
|
repo,
|
||||||
bucket: Bucket::new(endpoint, url_style, bucket_name, region)
|
bucket: Bucket::new(endpoint, url_style, bucket_name, region)
|
||||||
.map_err(ObjectError::from)?,
|
.map_err(ObjectError::from)?,
|
||||||
credentials: Credentials::new_with_token(access_key, secret_key, session_token),
|
credentials: if let Some(token) = session_token {
|
||||||
|
Credentials::new_with_token(access_key, secret_key, token)
|
||||||
|
} else {
|
||||||
|
Credentials::new(access_key, secret_key)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +257,7 @@ impl ObjectStore {
|
||||||
Ok((self.build_request(action), ObjectId::from_string(path)))
|
Ok((self.build_request(action), ObjectId::from_string(path)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest {
|
fn build_request<'a, A: S3Action<'a>>(&'a self, mut action: A) -> ClientRequest {
|
||||||
let method = match A::METHOD {
|
let method = match A::METHOD {
|
||||||
rusty_s3::Method::Head => awc::http::Method::HEAD,
|
rusty_s3::Method::Head => awc::http::Method::HEAD,
|
||||||
rusty_s3::Method::Get => awc::http::Method::GET,
|
rusty_s3::Method::Get => awc::http::Method::GET,
|
||||||
|
@ -224,11 +268,11 @@ impl ObjectStore {
|
||||||
|
|
||||||
let url = action.sign(Duration::from_secs(5));
|
let url = action.sign(Duration::from_secs(5));
|
||||||
|
|
||||||
let req = self.client.request(method, url);
|
let req = self.client.request(method, url.as_str());
|
||||||
|
|
||||||
action
|
action
|
||||||
.headers_mut()
|
.headers_mut()
|
||||||
.drain()
|
.iter()
|
||||||
.fold(req, |req, tup| req.insert_header(tup))
|
.fold(req, |req, tup| req.insert_header(tup))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,17 +291,11 @@ impl ObjectStore {
|
||||||
let start = from_start.unwrap_or(0);
|
let start = from_start.unwrap_or(0);
|
||||||
let end = len.map(|len| start + len - 1);
|
let end = len.map(|len| start + len - 1);
|
||||||
|
|
||||||
let range = match (start, end) {
|
req.insert_header(Range::Bytes(vec![if let Some(end) = end {
|
||||||
(Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)),
|
ByteRangeSpec::FromTo(start, end)
|
||||||
(Some(start), None) => Some(ByteRangeSpec::From(start)),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(range) = range {
|
|
||||||
req.insert_header(Range::Bytes(vec![range]))
|
|
||||||
} else {
|
} else {
|
||||||
req
|
ByteRangeSpec::From(start)
|
||||||
}
|
}]))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
|
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
|
||||||
|
|
Loading…
Reference in a new issue