mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Add public_url to Store for optional redirects to public object store urls
Add details generation to file migration to set the content-type for uploads
This commit is contained in:
parent
8bdd64b284
commit
786f583d98
11 changed files with 242 additions and 50 deletions
|
@ -5,6 +5,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use futures_util::{Stream, TryStreamExt};
|
use futures_util::{Stream, TryStreamExt};
|
||||||
|
use mime::APPLICATION_OCTET_STREAM;
|
||||||
use tracing::{Instrument, Span};
|
use tracing::{Instrument, Span};
|
||||||
|
|
||||||
pub(crate) struct Backgrounded<R, S>
|
pub(crate) struct Backgrounded<R, S>
|
||||||
|
@ -58,7 +59,8 @@ where
|
||||||
|
|
||||||
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
||||||
|
|
||||||
let identifier = store.save_stream(stream).await?;
|
// use octet-stream, we don't know the upload's real type yet
|
||||||
|
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;
|
||||||
|
|
||||||
self.identifier = Some(identifier);
|
self.identifier = Some(identifier);
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,9 @@ pub(crate) struct ObjectStorage {
|
||||||
///
|
///
|
||||||
/// This defaults to 30 seconds
|
/// This defaults to 30 seconds
|
||||||
pub(crate) client_timeout: u64,
|
pub(crate) client_timeout: u64,
|
||||||
|
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub(crate) public_endpoint: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
|
|
@ -91,6 +91,11 @@ pub(crate) struct ObjectStorage {
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub(crate) client_timeout: Option<u64>,
|
pub(crate) client_timeout: Option<u64>,
|
||||||
|
|
||||||
|
/// Base endpoint at which object storage images are publicly accessible
|
||||||
|
#[arg(long)]
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub(crate) public_endpoint: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
|
|
@ -64,7 +64,7 @@ impl Details {
|
||||||
InternalFormat::maybe_from_media_type(&self.content_type, self.frames.is_some())
|
InternalFormat::maybe_from_media_type(&self.content_type, self.frames.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn content_type(&self) -> mime::Mime {
|
pub(crate) fn media_type(&self) -> mime::Mime {
|
||||||
(*self.content_type).clone()
|
(*self.content_type).clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,13 @@ impl ThumbnailFormat {
|
||||||
// Self::Webp => "webp",
|
// Self::Webp => "webp",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn media_type(self) -> mime::Mime {
|
||||||
|
match self {
|
||||||
|
Self::Jpeg => mime::IMAGE_JPEG,
|
||||||
|
// Self::Webp => crate::formats::mimes::image_webp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(store))]
|
#[tracing::instrument(skip(store))]
|
||||||
|
|
|
@ -68,14 +68,19 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
return Err(UploadError::MissingIdentifier.into());
|
return Err(UploadError::MissingIdentifier.into());
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let thumbnail_format = thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg);
|
||||||
|
|
||||||
let reader = crate::ffmpeg::thumbnail(
|
let reader = crate::ffmpeg::thumbnail(
|
||||||
store.clone(),
|
store.clone(),
|
||||||
identifier,
|
identifier,
|
||||||
input_format.unwrap_or(InternalVideoFormat::Mp4),
|
input_format.unwrap_or(InternalVideoFormat::Mp4),
|
||||||
thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg),
|
thumbnail_format,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let motion_identifier = store.save_async_read(reader).await?;
|
|
||||||
|
let motion_identifier = store
|
||||||
|
.save_async_read(reader, thumbnail_format.media_type())
|
||||||
|
.await?;
|
||||||
|
|
||||||
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
|
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -122,7 +127,9 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
|
|
||||||
let details = Details::from_bytes(bytes.clone()).await?;
|
let details = Details::from_bytes(bytes.clone()).await?;
|
||||||
|
|
||||||
let identifier = store.save_bytes(bytes.clone()).await?;
|
let identifier = store
|
||||||
|
.save_bytes(bytes.clone(), details.media_type())
|
||||||
|
.await?;
|
||||||
repo.relate_details(&identifier, &details).await?;
|
repo.relate_details(&identifier, &details).await?;
|
||||||
repo.relate_variant_identifier(
|
repo.relate_variant_identifier(
|
||||||
hash,
|
hash,
|
||||||
|
|
|
@ -89,7 +89,9 @@ where
|
||||||
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
|
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
|
||||||
let hasher = hasher_reader.hasher();
|
let hasher = hasher_reader.hasher();
|
||||||
|
|
||||||
let identifier = store.save_async_read(hasher_reader).await?;
|
let identifier = store
|
||||||
|
.save_async_read(hasher_reader, input_type.media_type())
|
||||||
|
.await?;
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
|
|
115
src/lib.rs
115
src/lib.rs
|
@ -672,6 +672,12 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
new_details
|
new_details
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||||
|
.finish());
|
||||||
|
}
|
||||||
|
|
||||||
return ranged_file_resp(&store, identifier, range, details, not_found).await;
|
return ranged_file_resp(&store, identifier, range, details, not_found).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -721,7 +727,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
Ok(srv_response(
|
Ok(srv_response(
|
||||||
builder,
|
builder,
|
||||||
stream,
|
stream,
|
||||||
details.content_type(),
|
details.media_type(),
|
||||||
7 * DAYS,
|
7 * DAYS,
|
||||||
details.system_time(),
|
details.system_time(),
|
||||||
))
|
))
|
||||||
|
@ -768,6 +774,12 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
|
||||||
new_details
|
new_details
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||||
|
.finish());
|
||||||
|
}
|
||||||
|
|
||||||
return ranged_file_head_resp(&store, identifier, range, details).await;
|
return ranged_file_head_resp(&store, identifier, range, details).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -847,6 +859,12 @@ async fn serve<R: FullRepo, S: Store + 'static>(
|
||||||
|
|
||||||
let details = ensure_details(&repo, &store, &alias).await?;
|
let details = ensure_details(&repo, &store, &alias).await?;
|
||||||
|
|
||||||
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||||
|
.finish());
|
||||||
|
}
|
||||||
|
|
||||||
ranged_file_resp(&store, identifier, range, details, not_found).await
|
ranged_file_resp(&store, identifier, range, details, not_found).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,6 +884,12 @@ async fn serve_head<R: FullRepo, S: Store + 'static>(
|
||||||
|
|
||||||
let details = ensure_details(&repo, &store, &alias).await?;
|
let details = ensure_details(&repo, &store, &alias).await?;
|
||||||
|
|
||||||
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||||
|
.finish());
|
||||||
|
}
|
||||||
|
|
||||||
ranged_file_head_resp(&store, identifier, range, details).await
|
ranged_file_head_resp(&store, identifier, range, details).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -897,7 +921,7 @@ async fn ranged_file_head_resp<S: Store + 'static>(
|
||||||
|
|
||||||
Ok(srv_head(
|
Ok(srv_head(
|
||||||
builder,
|
builder,
|
||||||
details.content_type(),
|
details.media_type(),
|
||||||
7 * DAYS,
|
7 * DAYS,
|
||||||
details.system_time(),
|
details.system_time(),
|
||||||
)
|
)
|
||||||
|
@ -953,7 +977,7 @@ async fn ranged_file_resp<S: Store + 'static>(
|
||||||
Ok(srv_response(
|
Ok(srv_response(
|
||||||
builder,
|
builder,
|
||||||
stream,
|
stream,
|
||||||
details.content_type(),
|
details.media_type(),
|
||||||
7 * DAYS,
|
7 * DAYS,
|
||||||
details.system_time(),
|
details.system_time(),
|
||||||
))
|
))
|
||||||
|
@ -1280,7 +1304,7 @@ async fn migrate_inner<S1>(
|
||||||
skip_missing_files: bool,
|
skip_missing_files: bool,
|
||||||
) -> color_eyre::Result<()>
|
) -> color_eyre::Result<()>
|
||||||
where
|
where
|
||||||
S1: Store,
|
S1: Store + 'static,
|
||||||
{
|
{
|
||||||
match to {
|
match to {
|
||||||
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
|
@ -1300,6 +1324,7 @@ where
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration,
|
signature_duration,
|
||||||
client_timeout,
|
client_timeout,
|
||||||
|
public_endpoint,
|
||||||
}) => {
|
}) => {
|
||||||
let to = ObjectStore::build(
|
let to = ObjectStore::build(
|
||||||
endpoint.clone(),
|
endpoint.clone(),
|
||||||
|
@ -1315,6 +1340,7 @@ where
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration.unwrap_or(15),
|
signature_duration.unwrap_or(15),
|
||||||
client_timeout.unwrap_or(30),
|
client_timeout.unwrap_or(30),
|
||||||
|
public_endpoint,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
|
@ -1427,6 +1453,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration,
|
signature_duration,
|
||||||
client_timeout,
|
client_timeout,
|
||||||
|
public_endpoint,
|
||||||
}) => {
|
}) => {
|
||||||
let from = ObjectStore::build(
|
let from = ObjectStore::build(
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -1442,6 +1469,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration.unwrap_or(15),
|
signature_duration.unwrap_or(15),
|
||||||
client_timeout.unwrap_or(30),
|
client_timeout.unwrap_or(30),
|
||||||
|
public_endpoint,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
|
@ -1480,6 +1508,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration,
|
signature_duration,
|
||||||
client_timeout,
|
client_timeout,
|
||||||
|
public_endpoint,
|
||||||
}) => {
|
}) => {
|
||||||
let store = ObjectStore::build(
|
let store = ObjectStore::build(
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -1495,6 +1524,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
session_token,
|
session_token,
|
||||||
signature_duration,
|
signature_duration,
|
||||||
client_timeout,
|
client_timeout,
|
||||||
|
public_endpoint,
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -1527,7 +1557,7 @@ async fn migrate_store<R, S1, S2>(
|
||||||
skip_missing_files: bool,
|
skip_missing_files: bool,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
S1: Store + Clone,
|
S1: Store + Clone + 'static,
|
||||||
S2: Store + Clone,
|
S2: Store + Clone,
|
||||||
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
||||||
{
|
{
|
||||||
|
@ -1570,7 +1600,7 @@ async fn do_migrate_store<R, S1, S2>(
|
||||||
skip_missing_files: bool,
|
skip_missing_files: bool,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
S1: Store,
|
S1: Store + 'static,
|
||||||
S2: Store,
|
S2: Store,
|
||||||
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
||||||
{
|
{
|
||||||
|
@ -1639,7 +1669,7 @@ where
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
|
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
|
||||||
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
|
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
||||||
Ok(new_identifier) => {
|
Ok(new_identifier) => {
|
||||||
migrate_details(repo, identifier, &new_identifier).await?;
|
migrate_details(repo, identifier, &new_identifier).await?;
|
||||||
repo.relate_motion_identifier(
|
repo.relate_motion_identifier(
|
||||||
|
@ -1653,6 +1683,13 @@ where
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||||
}
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for motion file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
Err(MigrateError::From(e)) => {
|
Err(MigrateError::From(e)) => {
|
||||||
tracing::warn!("Error migrating motion file from old store");
|
tracing::warn!("Error migrating motion file from old store");
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
|
@ -1675,7 +1712,7 @@ where
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
|
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
||||||
Ok(new_identifier) => {
|
Ok(new_identifier) => {
|
||||||
migrate_details(repo, identifier, &new_identifier).await?;
|
migrate_details(repo, identifier, &new_identifier).await?;
|
||||||
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
|
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
|
||||||
|
@ -1697,6 +1734,13 @@ where
|
||||||
hex::encode(&hash)
|
hex::encode(&hash)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for variant file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
Err(MigrateError::From(e)) => {
|
Err(MigrateError::From(e)) => {
|
||||||
tracing::warn!("Error migrating variant file from old store");
|
tracing::warn!("Error migrating variant file from old store");
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
|
@ -1708,7 +1752,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match migrate_file(&from, &to, &original_identifier, skip_missing_files).await {
|
match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await {
|
||||||
Ok(new_identifier) => {
|
Ok(new_identifier) => {
|
||||||
migrate_details(repo, original_identifier, &new_identifier).await?;
|
migrate_details(repo, original_identifier, &new_identifier).await?;
|
||||||
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
|
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
|
||||||
|
@ -1717,6 +1761,13 @@ where
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||||
}
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for original file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
Err(MigrateError::From(e)) => {
|
Err(MigrateError::From(e)) => {
|
||||||
tracing::warn!("Error migrating original file from old store");
|
tracing::warn!("Error migrating original file from old store");
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
|
@ -1756,20 +1807,22 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn migrate_file<S1, S2>(
|
async fn migrate_file<R, S1, S2>(
|
||||||
|
repo: &R,
|
||||||
from: &S1,
|
from: &S1,
|
||||||
to: &S2,
|
to: &S2,
|
||||||
identifier: &S1::Identifier,
|
identifier: &S1::Identifier,
|
||||||
skip_missing_files: bool,
|
skip_missing_files: bool,
|
||||||
) -> Result<S2::Identifier, MigrateError>
|
) -> Result<S2::Identifier, MigrateError>
|
||||||
where
|
where
|
||||||
S1: Store,
|
R: IdentifierRepo,
|
||||||
|
S1: Store + 'static,
|
||||||
S2: Store,
|
S2: Store,
|
||||||
{
|
{
|
||||||
let mut failure_count = 0;
|
let mut failure_count = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match do_migrate_file(from, to, identifier).await {
|
match do_migrate_file(repo, from, to, identifier).await {
|
||||||
Ok(identifier) => return Ok(identifier),
|
Ok(identifier) => return Ok(identifier),
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||||
return Err(MigrateError::From(e));
|
return Err(MigrateError::From(e));
|
||||||
|
@ -1793,16 +1846,19 @@ where
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum MigrateError {
|
enum MigrateError {
|
||||||
From(crate::store::StoreError),
|
From(crate::store::StoreError),
|
||||||
|
Details(crate::error::Error),
|
||||||
To(crate::store::StoreError),
|
To(crate::store::StoreError),
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_migrate_file<S1, S2>(
|
async fn do_migrate_file<R, S1, S2>(
|
||||||
|
repo: &R,
|
||||||
from: &S1,
|
from: &S1,
|
||||||
to: &S2,
|
to: &S2,
|
||||||
identifier: &S1::Identifier,
|
identifier: &S1::Identifier,
|
||||||
) -> Result<S2::Identifier, MigrateError>
|
) -> Result<S2::Identifier, MigrateError>
|
||||||
where
|
where
|
||||||
S1: Store,
|
R: IdentifierRepo,
|
||||||
|
S1: Store + 'static,
|
||||||
S2: Store,
|
S2: Store,
|
||||||
{
|
{
|
||||||
let stream = from
|
let stream = from
|
||||||
|
@ -1810,7 +1866,36 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(MigrateError::From)?;
|
.map_err(MigrateError::From)?;
|
||||||
|
|
||||||
let new_identifier = to.save_stream(stream).await.map_err(MigrateError::To)?;
|
let details_opt = repo
|
||||||
|
.details(identifier)
|
||||||
|
.await
|
||||||
|
.map_err(Error::from)
|
||||||
|
.map_err(MigrateError::Details)?
|
||||||
|
.and_then(|details| {
|
||||||
|
if details.internal_format().is_some() {
|
||||||
|
Some(details)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let details = if let Some(details) = details_opt {
|
||||||
|
details
|
||||||
|
} else {
|
||||||
|
let new_details = Details::from_store(from, identifier)
|
||||||
|
.await
|
||||||
|
.map_err(MigrateError::Details)?;
|
||||||
|
repo.relate_details(identifier, &new_details)
|
||||||
|
.await
|
||||||
|
.map_err(Error::from)
|
||||||
|
.map_err(MigrateError::Details)?;
|
||||||
|
new_details
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_identifier = to
|
||||||
|
.save_stream(stream, details.media_type())
|
||||||
|
.await
|
||||||
|
.map_err(MigrateError::To)?;
|
||||||
|
|
||||||
Ok(new_identifier)
|
Ok(new_identifier)
|
||||||
}
|
}
|
||||||
|
|
70
src/store.rs
70
src/store.rs
|
@ -72,15 +72,31 @@ pub(crate) trait Store: Clone + Debug {
|
||||||
|
|
||||||
async fn health_check(&self) -> Result<(), StoreError>;
|
async fn health_check(&self) -> Result<(), StoreError>;
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
async fn save_async_read<Reader>(
|
||||||
|
&self,
|
||||||
|
reader: Reader,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static;
|
Reader: AsyncRead + Unpin + 'static;
|
||||||
|
|
||||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
async fn save_stream<S>(
|
||||||
|
&self,
|
||||||
|
stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError>;
|
async fn save_bytes(
|
||||||
|
&self,
|
||||||
|
bytes: Bytes,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>;
|
||||||
|
|
||||||
|
fn public_url(&self, _: &Self::Identifier) -> Option<url::Url> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
async fn to_stream(
|
async fn to_stream(
|
||||||
&self,
|
&self,
|
||||||
|
@ -114,22 +130,34 @@ where
|
||||||
T::health_check(self).await
|
T::health_check(self).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
async fn save_async_read<Reader>(
|
||||||
|
&self,
|
||||||
|
reader: Reader,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader).await
|
T::save_async_read(self, reader, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
async fn save_stream<S>(
|
||||||
|
&self,
|
||||||
|
stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_stream(self, stream).await
|
T::save_stream(self, stream, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
async fn save_bytes(
|
||||||
T::save_bytes(self, bytes).await
|
&self,
|
||||||
|
bytes: Bytes,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError> {
|
||||||
|
T::save_bytes(self, bytes, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn to_stream(
|
async fn to_stream(
|
||||||
|
@ -173,22 +201,34 @@ where
|
||||||
T::health_check(self).await
|
T::health_check(self).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
async fn save_async_read<Reader>(
|
||||||
|
&self,
|
||||||
|
reader: Reader,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader).await
|
T::save_async_read(self, reader, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
async fn save_stream<S>(
|
||||||
|
&self,
|
||||||
|
stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
T::save_stream(self, stream).await
|
T::save_stream(self, stream, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
async fn save_bytes(
|
||||||
T::save_bytes(self, bytes).await
|
&self,
|
||||||
|
bytes: Bytes,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError> {
|
||||||
|
T::save_bytes(self, bytes, content_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn to_stream(
|
async fn to_stream(
|
||||||
|
|
|
@ -66,6 +66,7 @@ impl Store for FileStore {
|
||||||
async fn save_async_read<Reader>(
|
async fn save_async_read<Reader>(
|
||||||
&self,
|
&self,
|
||||||
mut reader: Reader,
|
mut reader: Reader,
|
||||||
|
_content_type: mime::Mime,
|
||||||
) -> Result<Self::Identifier, StoreError>
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
|
@ -80,15 +81,24 @@ impl Store for FileStore {
|
||||||
Ok(self.file_id_from_path(path)?)
|
Ok(self.file_id_from_path(path)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
async fn save_stream<S>(
|
||||||
|
&self,
|
||||||
|
stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
self.save_async_read(StreamReader::new(stream)).await
|
self.save_async_read(StreamReader::new(stream), content_type)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(bytes))]
|
#[tracing::instrument(skip(bytes))]
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
async fn save_bytes(
|
||||||
|
&self,
|
||||||
|
bytes: Bytes,
|
||||||
|
_content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError> {
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
|
||||||
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
|
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
|
||||||
|
|
|
@ -98,6 +98,7 @@ pub(crate) struct ObjectStore {
|
||||||
client: Client,
|
client: Client,
|
||||||
signature_expiration: Duration,
|
signature_expiration: Duration,
|
||||||
client_timeout: Duration,
|
client_timeout: Duration,
|
||||||
|
public_endpoint: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -108,6 +109,7 @@ pub(crate) struct ObjectStoreConfig {
|
||||||
credentials: Credentials,
|
credentials: Credentials,
|
||||||
signature_expiration: u64,
|
signature_expiration: u64,
|
||||||
client_timeout: u64,
|
client_timeout: u64,
|
||||||
|
public_endpoint: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(serde::Deserialize, Debug)]
|
#[derive(serde::Deserialize, Debug)]
|
||||||
|
@ -130,6 +132,7 @@ impl ObjectStoreConfig {
|
||||||
client,
|
client,
|
||||||
signature_expiration: Duration::from_secs(self.signature_expiration),
|
signature_expiration: Duration::from_secs(self.signature_expiration),
|
||||||
client_timeout: Duration::from_secs(self.client_timeout),
|
client_timeout: Duration::from_secs(self.client_timeout),
|
||||||
|
public_endpoint: self.public_endpoint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -190,15 +193,24 @@ impl Store for ObjectStore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
async fn save_async_read<Reader>(
|
||||||
|
&self,
|
||||||
|
reader: Reader,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead + Unpin + 'static,
|
||||||
{
|
{
|
||||||
self.save_stream(ReaderStream::new(reader)).await
|
self.save_stream(ReaderStream::new(reader), content_type)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, StoreError>
|
async fn save_stream<S>(
|
||||||
|
&self,
|
||||||
|
mut stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||||
{
|
{
|
||||||
|
@ -206,7 +218,7 @@ impl Store for ObjectStore {
|
||||||
|
|
||||||
if first_chunk.len() < CHUNK_SIZE {
|
if first_chunk.len() < CHUNK_SIZE {
|
||||||
drop(stream);
|
drop(stream);
|
||||||
let (req, object_id) = self.put_object_request().await?;
|
let (req, object_id) = self.put_object_request(content_type).await?;
|
||||||
let response = req
|
let response = req
|
||||||
.send_body(first_chunk)
|
.send_body(first_chunk)
|
||||||
.await
|
.await
|
||||||
|
@ -221,7 +233,7 @@ impl Store for ObjectStore {
|
||||||
|
|
||||||
let mut first_chunk = Some(first_chunk);
|
let mut first_chunk = Some(first_chunk);
|
||||||
|
|
||||||
let (req, object_id) = self.create_multipart_request().await?;
|
let (req, object_id) = self.create_multipart_request(content_type).await?;
|
||||||
let mut response = req.send().await.map_err(ObjectError::from)?;
|
let mut response = req.send().await.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
|
@ -329,8 +341,12 @@ impl Store for ObjectStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
async fn save_bytes(
|
||||||
let (req, object_id) = self.put_object_request().await?;
|
&self,
|
||||||
|
bytes: Bytes,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<Self::Identifier, StoreError> {
|
||||||
|
let (req, object_id) = self.put_object_request(content_type).await?;
|
||||||
|
|
||||||
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
|
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
|
@ -341,6 +357,13 @@ impl Store for ObjectStore {
|
||||||
Ok(object_id)
|
Ok(object_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn public_url(&self, identifier: &Self::Identifier) -> Option<url::Url> {
|
||||||
|
self.public_endpoint.clone().map(|mut endpoint| {
|
||||||
|
endpoint.set_path(identifier.as_str());
|
||||||
|
endpoint
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
async fn to_stream(
|
async fn to_stream(
|
||||||
&self,
|
&self,
|
||||||
|
@ -445,6 +468,7 @@ impl ObjectStore {
|
||||||
session_token: Option<String>,
|
session_token: Option<String>,
|
||||||
signature_expiration: u64,
|
signature_expiration: u64,
|
||||||
client_timeout: u64,
|
client_timeout: u64,
|
||||||
|
public_endpoint: Option<Url>,
|
||||||
repo: Repo,
|
repo: Repo,
|
||||||
) -> Result<ObjectStoreConfig, StoreError> {
|
) -> Result<ObjectStoreConfig, StoreError> {
|
||||||
let path_gen = init_generator(&repo).await?;
|
let path_gen = init_generator(&repo).await?;
|
||||||
|
@ -461,6 +485,7 @@ impl ObjectStore {
|
||||||
},
|
},
|
||||||
signature_expiration,
|
signature_expiration,
|
||||||
client_timeout,
|
client_timeout,
|
||||||
|
public_endpoint,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,19 +495,25 @@ impl ObjectStore {
|
||||||
Ok(self.build_request(action))
|
Ok(self.build_request(action))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
|
async fn put_object_request(
|
||||||
|
&self,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
|
||||||
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
|
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
|
||||||
|
|
||||||
action
|
action
|
||||||
.headers_mut()
|
.headers_mut()
|
||||||
.insert("content-type", "application/octet-stream");
|
.insert("content-type", content_type.as_ref());
|
||||||
|
|
||||||
Ok((self.build_request(action), ObjectId::from_string(path)))
|
Ok((self.build_request(action), ObjectId::from_string(path)))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
|
async fn create_multipart_request(
|
||||||
|
&self,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
|
||||||
let mut action = self
|
let mut action = self
|
||||||
|
@ -491,7 +522,7 @@ impl ObjectStore {
|
||||||
|
|
||||||
action
|
action
|
||||||
.headers_mut()
|
.headers_mut()
|
||||||
.insert("content-type", "application/octet-stream");
|
.insert("content-type", content_type.as_ref());
|
||||||
|
|
||||||
Ok((self.build_request(action), ObjectId::from_string(path)))
|
Ok((self.build_request(action), ObjectId::from_string(path)))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue