mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2025-01-24 10:25:50 +00:00
Enable setting content type when uploading objects
This commit is contained in:
parent
5b3f8f4d15
commit
e23b33e245
9 changed files with 176 additions and 41 deletions
|
@ -5,6 +5,7 @@ use crate::{
|
|||
};
|
||||
use actix_web::web::Bytes;
|
||||
use futures_util::{Stream, TryStreamExt};
|
||||
use mime::APPLICATION_OCTET_STREAM;
|
||||
use tracing::{Instrument, Span};
|
||||
|
||||
pub(crate) struct Backgrounded<R, S>
|
||||
|
@ -58,7 +59,8 @@ where
|
|||
|
||||
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
||||
|
||||
let identifier = store.save_stream(stream).await?;
|
||||
// use octet-stream, we don't know the upload's real type yet
|
||||
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;
|
||||
|
||||
self.identifier = Some(identifier);
|
||||
|
||||
|
|
|
@ -353,6 +353,13 @@ impl ThumbnailFormat {
|
|||
// Self::Webp => "webp",
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn media_type(self) -> mime::Mime {
|
||||
match self {
|
||||
Self::Jpeg => mime::IMAGE_JPEG,
|
||||
// Self::Webp => image_webp(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OutputFormat {
|
||||
|
|
|
@ -68,14 +68,18 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
|||
return Err(UploadError::MissingIdentifier.into());
|
||||
};
|
||||
|
||||
let thumbnail_format = thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg);
|
||||
|
||||
let reader = crate::ffmpeg::thumbnail(
|
||||
store.clone(),
|
||||
identifier,
|
||||
input_format.unwrap_or(VideoFormat::Mp4),
|
||||
thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg),
|
||||
thumbnail_format,
|
||||
)
|
||||
.await?;
|
||||
let motion_identifier = store.save_async_read(reader).await?;
|
||||
let motion_identifier = store
|
||||
.save_async_read(reader, thumbnail_format.media_type())
|
||||
.await?;
|
||||
|
||||
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
|
||||
.await?;
|
||||
|
@ -97,7 +101,9 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
|||
|
||||
let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?;
|
||||
|
||||
let identifier = store.save_bytes(bytes.clone()).await?;
|
||||
let identifier = store
|
||||
.save_bytes(bytes.clone(), details.content_type())
|
||||
.await?;
|
||||
repo.relate_details(&identifier, &details).await?;
|
||||
repo.relate_variant_identifier(
|
||||
hash,
|
||||
|
|
|
@ -79,7 +79,9 @@ where
|
|||
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
|
||||
let hasher = hasher_reader.hasher();
|
||||
|
||||
let identifier = store.save_async_read(hasher_reader).await?;
|
||||
let identifier = store
|
||||
.save_async_read(hasher_reader, input_type.content_type())
|
||||
.await?;
|
||||
|
||||
drop(permit);
|
||||
|
||||
|
|
54
src/lib.rs
54
src/lib.rs
|
@ -1629,7 +1629,7 @@ where
|
|||
.await?
|
||||
{
|
||||
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
|
||||
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
|
||||
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, identifier, &new_identifier).await?;
|
||||
repo.relate_motion_identifier(
|
||||
|
@ -1640,6 +1640,13 @@ where
|
|||
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
|
@ -1665,7 +1672,7 @@ where
|
|||
continue;
|
||||
}
|
||||
|
||||
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
|
||||
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, identifier, &new_identifier).await?;
|
||||
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
|
||||
|
@ -1680,6 +1687,13 @@ where
|
|||
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into())
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
tracing::warn!(
|
||||
"Skipping variant {} for hash {}",
|
||||
|
@ -1698,12 +1712,19 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
match migrate_file(&from, &to, &original_identifier, skip_missing_files).await {
|
||||
match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, original_identifier, &new_identifier).await?;
|
||||
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
|
@ -1746,20 +1767,22 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_file<S1, S2>(
|
||||
async fn migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
let mut failure_count = 0;
|
||||
|
||||
loop {
|
||||
match do_migrate_file(from, to, identifier).await {
|
||||
match do_migrate_file(repo, from, to, identifier).await {
|
||||
Ok(identifier) => return Ok(identifier),
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
return Err(MigrateError::From(e));
|
||||
|
@ -1783,15 +1806,18 @@ where
|
|||
#[derive(Debug)]
|
||||
enum MigrateError {
|
||||
From(crate::store::StoreError),
|
||||
Details(crate::store::StoreError),
|
||||
To(crate::store::StoreError),
|
||||
}
|
||||
|
||||
async fn do_migrate_file<S1, S2>(
|
||||
async fn do_migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
|
@ -1800,7 +1826,21 @@ where
|
|||
.await
|
||||
.map_err(MigrateError::From)?;
|
||||
|
||||
let new_identifier = to.save_stream(stream).await.map_err(MigrateError::To)?;
|
||||
let details_opt = repo
|
||||
.details(identifier)
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
|
||||
let content_type = if let Some(details) = details_opt {
|
||||
details.content_type()
|
||||
} else {
|
||||
mime::APPLICATION_OCTET_STREAM
|
||||
};
|
||||
|
||||
let new_identifier = to
|
||||
.save_stream(stream, content_type)
|
||||
.await
|
||||
.map_err(MigrateError::To)?;
|
||||
|
||||
Ok(new_identifier)
|
||||
}
|
||||
|
|
|
@ -179,6 +179,19 @@ impl ValidInputType {
|
|||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn content_type(self) -> mime::Mime {
|
||||
match self {
|
||||
Self::Avif => image_avif(),
|
||||
Self::Gif => mime::IMAGE_GIF,
|
||||
Self::Jpeg => mime::IMAGE_JPEG,
|
||||
Self::Jxl => image_jxl(),
|
||||
Self::Mp4 => video_mp4(),
|
||||
Self::Png => mime::IMAGE_PNG,
|
||||
Self::Webm => video_webm(),
|
||||
Self::Webp => image_webp(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
|
|
66
src/store.rs
66
src/store.rs
|
@ -72,15 +72,27 @@ pub(crate) trait Store: Clone + Debug {
|
|||
|
||||
async fn health_check(&self) -> Result<(), StoreError>;
|
||||
|
||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_async_read<Reader>(
|
||||
&self,
|
||||
reader: Reader,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
Reader: AsyncRead + Unpin + 'static;
|
||||
|
||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_stream<S>(
|
||||
&self,
|
||||
stream: S,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
||||
|
||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError>;
|
||||
async fn save_bytes(
|
||||
&self,
|
||||
bytes: Bytes,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>;
|
||||
|
||||
async fn to_stream(
|
||||
&self,
|
||||
|
@ -114,22 +126,34 @@ where
|
|||
T::health_check(self).await
|
||||
}
|
||||
|
||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_async_read<Reader>(
|
||||
&self,
|
||||
reader: Reader,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
Reader: AsyncRead + Unpin + 'static,
|
||||
{
|
||||
T::save_async_read(self, reader).await
|
||||
T::save_async_read(self, reader, content_type).await
|
||||
}
|
||||
|
||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_stream<S>(
|
||||
&self,
|
||||
stream: S,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||
{
|
||||
T::save_stream(self, stream).await
|
||||
T::save_stream(self, stream, content_type).await
|
||||
}
|
||||
|
||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
||||
T::save_bytes(self, bytes).await
|
||||
async fn save_bytes(
|
||||
&self,
|
||||
bytes: Bytes,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError> {
|
||||
T::save_bytes(self, bytes, content_type).await
|
||||
}
|
||||
|
||||
async fn to_stream(
|
||||
|
@ -173,22 +197,34 @@ where
|
|||
T::health_check(self).await
|
||||
}
|
||||
|
||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_async_read<Reader>(
|
||||
&self,
|
||||
reader: Reader,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
Reader: AsyncRead + Unpin + 'static,
|
||||
{
|
||||
T::save_async_read(self, reader).await
|
||||
T::save_async_read(self, reader, content_type).await
|
||||
}
|
||||
|
||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_stream<S>(
|
||||
&self,
|
||||
stream: S,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||
{
|
||||
T::save_stream(self, stream).await
|
||||
T::save_stream(self, stream, content_type).await
|
||||
}
|
||||
|
||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
||||
T::save_bytes(self, bytes).await
|
||||
async fn save_bytes(
|
||||
&self,
|
||||
bytes: Bytes,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError> {
|
||||
T::save_bytes(self, bytes, content_type).await
|
||||
}
|
||||
|
||||
async fn to_stream(
|
||||
|
|
|
@ -66,6 +66,7 @@ impl Store for FileStore {
|
|||
async fn save_async_read<Reader>(
|
||||
&self,
|
||||
mut reader: Reader,
|
||||
_content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
Reader: AsyncRead + Unpin + 'static,
|
||||
|
@ -80,15 +81,24 @@ impl Store for FileStore {
|
|||
Ok(self.file_id_from_path(path)?)
|
||||
}
|
||||
|
||||
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_stream<S>(
|
||||
&self,
|
||||
stream: S,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||
{
|
||||
self.save_async_read(StreamReader::new(stream)).await
|
||||
self.save_async_read(StreamReader::new(stream), content_type)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(bytes))]
|
||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
||||
async fn save_bytes(
|
||||
&self,
|
||||
bytes: Bytes,
|
||||
_content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError> {
|
||||
let path = self.next_file().await?;
|
||||
|
||||
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
|
||||
|
|
|
@ -190,15 +190,24 @@ impl Store for ObjectStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_async_read<Reader>(
|
||||
&self,
|
||||
reader: Reader,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
Reader: AsyncRead + Unpin + 'static,
|
||||
{
|
||||
self.save_stream(ReaderStream::new(reader)).await
|
||||
self.save_stream(ReaderStream::new(reader), content_type)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, StoreError>
|
||||
async fn save_stream<S>(
|
||||
&self,
|
||||
mut stream: S,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError>
|
||||
where
|
||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
||||
{
|
||||
|
@ -206,7 +215,7 @@ impl Store for ObjectStore {
|
|||
|
||||
if first_chunk.len() < CHUNK_SIZE {
|
||||
drop(stream);
|
||||
let (req, object_id) = self.put_object_request().await?;
|
||||
let (req, object_id) = self.put_object_request(content_type).await?;
|
||||
let response = req
|
||||
.send_body(first_chunk)
|
||||
.await
|
||||
|
@ -221,7 +230,7 @@ impl Store for ObjectStore {
|
|||
|
||||
let mut first_chunk = Some(first_chunk);
|
||||
|
||||
let (req, object_id) = self.create_multipart_request().await?;
|
||||
let (req, object_id) = self.create_multipart_request(content_type).await?;
|
||||
let mut response = req.send().await.map_err(ObjectError::from)?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
|
@ -329,8 +338,12 @@ impl Store for ObjectStore {
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
|
||||
let (req, object_id) = self.put_object_request().await?;
|
||||
async fn save_bytes(
|
||||
&self,
|
||||
bytes: Bytes,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<Self::Identifier, StoreError> {
|
||||
let (req, object_id) = self.put_object_request(content_type).await?;
|
||||
|
||||
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
|
||||
|
||||
|
@ -470,19 +483,25 @@ impl ObjectStore {
|
|||
Ok(self.build_request(action))
|
||||
}
|
||||
|
||||
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||
async fn put_object_request(
|
||||
&self,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||
let path = self.next_file().await?;
|
||||
|
||||
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
|
||||
|
||||
action
|
||||
.headers_mut()
|
||||
.insert("content-type", "application/octet-stream");
|
||||
.insert("content-type", content_type.as_ref());
|
||||
|
||||
Ok((self.build_request(action), ObjectId::from_string(path)))
|
||||
}
|
||||
|
||||
async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||
async fn create_multipart_request(
|
||||
&self,
|
||||
content_type: mime::Mime,
|
||||
) -> Result<(ClientRequest, ObjectId), StoreError> {
|
||||
let path = self.next_file().await?;
|
||||
|
||||
let mut action = self
|
||||
|
@ -491,7 +510,7 @@ impl ObjectStore {
|
|||
|
||||
action
|
||||
.headers_mut()
|
||||
.insert("content-type", "application/octet-stream");
|
||||
.insert("content-type", content_type.as_ref());
|
||||
|
||||
Ok((self.build_request(action), ObjectId::from_string(path)))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue