mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Merge pull request 'asonix/use-bytes-stream' (#52) from asonix/use-bytes-stream into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/52
This commit is contained in:
commit
ca13b7b30b
25 changed files with 450 additions and 399 deletions
|
@ -29,14 +29,14 @@ services:
|
||||||
# - PICTRS_PROXY_UPSTREAM=http://pictrs:8080
|
# - PICTRS_PROXY_UPSTREAM=http://pictrs:8080
|
||||||
# - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317
|
# - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317
|
||||||
|
|
||||||
minio:
|
# minio:
|
||||||
image: quay.io/minio/minio
|
# image: quay.io/minio/minio
|
||||||
command: server /mnt --console-address ":9001"
|
# command: server /mnt --console-address ":9001"
|
||||||
ports:
|
# ports:
|
||||||
- "9000:9000"
|
# - "9000:9000"
|
||||||
- "9001:9001"
|
# - "9001:9001"
|
||||||
volumes:
|
# volumes:
|
||||||
- ./storage/minio:/mnt
|
# - ./storage/minio:/mnt
|
||||||
|
|
||||||
garage:
|
garage:
|
||||||
image: dxflrs/garage:v0.9.0
|
image: dxflrs/garage:v0.9.0
|
||||||
|
|
|
@ -54,9 +54,9 @@ impl Backgrounded {
|
||||||
{
|
{
|
||||||
self.upload_id = Some(self.repo.create_upload().await?);
|
self.upload_id = Some(self.repo.create_upload().await?);
|
||||||
|
|
||||||
let stream = Box::pin(crate::stream::map_err(stream, |e| {
|
let stream = crate::stream::map_err(stream, |e| {
|
||||||
std::io::Error::new(std::io::ErrorKind::Other, e)
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
||||||
}));
|
});
|
||||||
|
|
||||||
// use octet-stream, we don't know the upload's real type yet
|
// use octet-stream, we don't know the upload's real type yet
|
||||||
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;
|
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
use actix_web::{
|
use actix_web::web::Bytes;
|
||||||
body::MessageBody,
|
|
||||||
web::{Bytes, BytesMut},
|
|
||||||
};
|
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{vec_deque::IntoIter, VecDeque},
|
collections::{vec_deque::IntoIter, VecDeque},
|
||||||
|
@ -9,6 +6,9 @@ use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
use streem::IntoStreamer;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
use tokio_util::bytes::Buf;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub(crate) struct BytesStream {
|
pub(crate) struct BytesStream {
|
||||||
|
@ -24,6 +24,33 @@ impl BytesStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(stream))]
|
||||||
|
pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, E>>,
|
||||||
|
{
|
||||||
|
let stream = std::pin::pin!(stream);
|
||||||
|
let mut stream = stream.into_streamer();
|
||||||
|
let mut bs = Self::new();
|
||||||
|
|
||||||
|
while let Some(bytes) = stream.try_next().await? {
|
||||||
|
tracing::trace!("try_from_stream: looping");
|
||||||
|
bs.add_bytes(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"BytesStream with {} chunks, avg length {}",
|
||||||
|
bs.chunks_len(),
|
||||||
|
bs.len() / bs.chunks_len()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(bs)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn chunks_len(&self) -> usize {
|
||||||
|
self.inner.len()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
|
pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
|
||||||
self.total_len += bytes.len();
|
self.total_len += bytes.len();
|
||||||
self.inner.push_back(bytes);
|
self.inner.push_back(bytes);
|
||||||
|
@ -33,15 +60,25 @@ impl BytesStream {
|
||||||
self.total_len
|
self.total_len
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn into_bytes(self) -> Bytes {
|
pub(crate) fn is_empty(&self) -> bool {
|
||||||
let mut buf = BytesMut::with_capacity(self.total_len);
|
self.total_len == 0
|
||||||
|
|
||||||
for bytes in self.inner {
|
|
||||||
buf.extend_from_slice(&bytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
buf.freeze()
|
pub(crate) fn into_reader(self) -> BytesReader {
|
||||||
|
BytesReader { inner: self.inner }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn into_io_stream(self) -> IoStream {
|
||||||
|
IoStream { inner: self.inner }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct IoStream {
|
||||||
|
inner: VecDeque<Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct BytesReader {
|
||||||
|
inner: VecDeque<Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoIterator for BytesStream {
|
impl IntoIterator for BytesStream {
|
||||||
|
@ -53,36 +90,56 @@ impl IntoIterator for BytesStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for BytesStream {
|
|
||||||
type Error = std::io::Error;
|
|
||||||
|
|
||||||
fn size(&self) -> actix_web::body::BodySize {
|
|
||||||
if let Ok(len) = self.len().try_into() {
|
|
||||||
actix_web::body::BodySize::Sized(len)
|
|
||||||
} else {
|
|
||||||
actix_web::body::BodySize::None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_: &mut Context<'_>,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_into_bytes(self) -> Result<Bytes, Self>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Ok(self.into_bytes())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream for BytesStream {
|
impl Stream for BytesStream {
|
||||||
type Item = Result<Bytes, Infallible>;
|
type Item = Result<Bytes, Infallible>;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
(self.inner.len(), Some(self.inner.len()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for IoStream {
|
||||||
|
type Item = std::io::Result<Bytes>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
(self.inner.len(), Some(self.inner.len()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for BytesReader {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
while buf.remaining() > 0 {
|
||||||
|
tracing::trace!("bytes reader: looping");
|
||||||
|
|
||||||
|
if let Some(bytes) = self.inner.front_mut() {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
self.inner.pop_front();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let upper_bound = buf.remaining().min(bytes.len());
|
||||||
|
|
||||||
|
let slice = &bytes[..upper_bound];
|
||||||
|
|
||||||
|
buf.put_slice(slice);
|
||||||
|
bytes.advance(upper_bound);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
repo::Hash,
|
repo::Hash,
|
||||||
};
|
};
|
||||||
use actix_web::web;
|
|
||||||
use dashmap::{mapref::entry::Entry, DashMap};
|
use dashmap::{mapref::entry::Entry, DashMap};
|
||||||
use flume::{r#async::RecvFut, Receiver, Sender};
|
use flume::{r#async::RecvFut, Receiver, Sender};
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -15,7 +15,7 @@ use std::{
|
||||||
};
|
};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
type OutcomeReceiver = Receiver<(Details, web::Bytes)>;
|
type OutcomeReceiver = Receiver<(Details, Arc<str>)>;
|
||||||
|
|
||||||
type ProcessMapKey = (Hash, PathBuf);
|
type ProcessMapKey = (Hash, PathBuf);
|
||||||
|
|
||||||
|
@ -36,9 +36,9 @@ impl ProcessMap {
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
fut: Fut,
|
fut: Fut,
|
||||||
) -> Result<(Details, web::Bytes), Error>
|
) -> Result<(Details, Arc<str>), Error>
|
||||||
where
|
where
|
||||||
Fut: Future<Output = Result<(Details, web::Bytes), Error>>,
|
Fut: Future<Output = Result<(Details, Arc<str>), Error>>,
|
||||||
{
|
{
|
||||||
let key = (hash.clone(), path.clone());
|
let key = (hash.clone(), path.clone());
|
||||||
|
|
||||||
|
@ -100,10 +100,10 @@ struct CancelToken {
|
||||||
|
|
||||||
enum CancelState {
|
enum CancelState {
|
||||||
Sender {
|
Sender {
|
||||||
sender: Sender<(Details, web::Bytes)>,
|
sender: Sender<(Details, Arc<str>)>,
|
||||||
},
|
},
|
||||||
Receiver {
|
Receiver {
|
||||||
receiver: RecvFut<'static, (Details, web::Bytes)>,
|
receiver: RecvFut<'static, (Details, Arc<str>)>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,9 +124,9 @@ pin_project_lite::pin_project! {
|
||||||
|
|
||||||
impl<F> Future for CancelSafeProcessor<F>
|
impl<F> Future for CancelSafeProcessor<F>
|
||||||
where
|
where
|
||||||
F: Future<Output = Result<(Details, web::Bytes), Error>>,
|
F: Future<Output = Result<(Details, Arc<str>), Error>>,
|
||||||
{
|
{
|
||||||
type Output = Result<(Details, web::Bytes), Error>;
|
type Output = Result<(Details, Arc<str>), Error>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.as_mut().project();
|
let this = self.as_mut().project();
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
discover::Discovery,
|
discover::Discovery,
|
||||||
error::Error,
|
error::Error,
|
||||||
formats::{InternalFormat, InternalVideoFormat},
|
formats::{InternalFormat, InternalVideoFormat},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
state::State,
|
state::State,
|
||||||
};
|
};
|
||||||
use actix_web::web;
|
|
||||||
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
|
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
@ -80,13 +81,16 @@ impl Details {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(level = "debug", skip_all)]
|
||||||
pub(crate) async fn from_bytes<S>(state: &State<S>, input: web::Bytes) -> Result<Self, Error> {
|
pub(crate) async fn from_bytes_stream<S>(
|
||||||
|
state: &State<S>,
|
||||||
|
input: BytesStream,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
let Discovery {
|
let Discovery {
|
||||||
input,
|
input,
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
frames,
|
frames,
|
||||||
} = crate::discover::discover_bytes(state, input).await?;
|
} = crate::discover::discover_bytes_stream(state, input).await?;
|
||||||
|
|
||||||
Ok(Details::from_parts(
|
Ok(Details::from_parts(
|
||||||
input.internal_format(),
|
input.internal_format(),
|
||||||
|
|
|
@ -2,9 +2,9 @@ mod exiftool;
|
||||||
mod ffmpeg;
|
mod ffmpeg;
|
||||||
mod magick;
|
mod magick;
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{formats::InputFile, state::State};
|
|
||||||
|
use crate::{bytes_stream::BytesStream, formats::InputFile, state::State};
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub(crate) struct Discovery {
|
pub(crate) struct Discovery {
|
||||||
|
@ -27,13 +27,13 @@ pub(crate) enum DiscoverError {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all)]
|
#[tracing::instrument(level = "trace", skip_all)]
|
||||||
pub(crate) async fn discover_bytes<S>(
|
pub(crate) async fn discover_bytes_stream<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<Discovery, crate::error::Error> {
|
) -> Result<Discovery, crate::error::Error> {
|
||||||
let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?;
|
let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?;
|
||||||
|
|
||||||
let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?;
|
let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?;
|
||||||
|
|
||||||
let discovery =
|
let discovery =
|
||||||
exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?;
|
exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?;
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
exiftool::ExifError,
|
exiftool::ExifError,
|
||||||
formats::{ImageInput, InputFile},
|
formats::{ImageInput, InputFile},
|
||||||
process::Process,
|
process::Process,
|
||||||
|
@ -16,7 +17,7 @@ pub(super) async fn check_reorient(
|
||||||
height,
|
height,
|
||||||
frames,
|
frames,
|
||||||
}: Discovery,
|
}: Discovery,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<Discovery, ExifError> {
|
) -> Result<Discovery, ExifError> {
|
||||||
let input = match input {
|
let input = match input {
|
||||||
|
@ -40,9 +41,9 @@ pub(super) async fn check_reorient(
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all)]
|
#[tracing::instrument(level = "trace", skip_all)]
|
||||||
async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
|
async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> {
|
||||||
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
||||||
.bytes_read(input)
|
.bytes_stream_read(input)
|
||||||
.into_string()
|
.into_string()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ mod tests;
|
||||||
use std::{collections::HashSet, sync::OnceLock};
|
use std::{collections::HashSet, sync::OnceLock};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
ffmpeg::FfMpegError,
|
ffmpeg::FfMpegError,
|
||||||
formats::{
|
formats::{
|
||||||
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
|
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
|
||||||
|
@ -12,7 +13,7 @@ use crate::{
|
||||||
process::Process,
|
process::Process,
|
||||||
state::State,
|
state::State,
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use super::Discovery;
|
use super::Discovery;
|
||||||
|
|
||||||
|
@ -158,15 +159,15 @@ struct Flags {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(super) async fn discover_bytes<S>(
|
pub(super) async fn discover_bytes_stream<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<Option<Discovery>, FfMpegError> {
|
) -> Result<Option<Discovery>, FfMpegError> {
|
||||||
discover_file(state, move |mut file| {
|
discover_file(state, move |mut file| {
|
||||||
let bytes = bytes.clone();
|
let bytes = bytes.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
file.write_from_bytes(bytes)
|
file.write_from_stream(bytes.into_io_stream())
|
||||||
.await
|
.await
|
||||||
.map_err(FfMpegError::Write)?;
|
.map_err(FfMpegError::Write)?;
|
||||||
Ok(file)
|
Ok(file)
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
discover::DiscoverError,
|
discover::DiscoverError,
|
||||||
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
||||||
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||||
|
@ -31,10 +32,10 @@ struct Geometry {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(super) async fn confirm_bytes<S>(
|
pub(super) async fn confirm_bytes_stream<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
discovery: Option<Discovery>,
|
discovery: Option<Discovery>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<Discovery, MagickError> {
|
) -> Result<Discovery, MagickError> {
|
||||||
match discovery {
|
match discovery {
|
||||||
Some(Discovery {
|
Some(Discovery {
|
||||||
|
@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes<S>(
|
||||||
}
|
}
|
||||||
|
|
||||||
discover_file(state, move |mut file| async move {
|
discover_file(state, move |mut file| async move {
|
||||||
file.write_from_bytes(bytes)
|
file.write_from_stream(bytes.into_io_stream())
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::Write)?;
|
.map_err(MagickError::Write)?;
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
error_code::ErrorCode,
|
error_code::ErrorCode,
|
||||||
process::{Process, ProcessError, ProcessRead},
|
process::{Process, ProcessError, ProcessRead},
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum ExifError {
|
pub(crate) enum ExifError {
|
||||||
|
@ -39,9 +40,9 @@ impl ExifError {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(input))]
|
#[tracing::instrument(level = "trace", skip(input))]
|
||||||
pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> {
|
pub(crate) async fn needs_reorienting(timeout: u64, input: BytesStream) -> Result<bool, ExifError> {
|
||||||
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
||||||
.bytes_read(input)
|
.bytes_stream_read(input)
|
||||||
.into_string()
|
.into_string()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -51,9 +52,9 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool
|
||||||
#[tracing::instrument(level = "trace", skip(input))]
|
#[tracing::instrument(level = "trace", skip(input))]
|
||||||
pub(crate) fn clear_metadata_bytes_read(
|
pub(crate) fn clear_metadata_bytes_read(
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
input: Bytes,
|
input: BytesStream,
|
||||||
) -> Result<ProcessRead, ExifError> {
|
) -> Result<ProcessRead, ExifError> {
|
||||||
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?;
|
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?;
|
||||||
|
|
||||||
Ok(process.bytes_read(input))
|
Ok(process.bytes_stream_read(input))
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ use crate::{
|
||||||
state::State,
|
state::State,
|
||||||
store::Store,
|
store::Store,
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
|
||||||
use std::{
|
use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
@ -57,7 +57,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
||||||
thumbnail_args: Vec<String>,
|
thumbnail_args: Vec<String>,
|
||||||
original_details: &Details,
|
original_details: &Details,
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
) -> Result<(Details, Bytes), Error> {
|
) -> Result<(Details, Arc<str>), Error> {
|
||||||
if state.config.server.danger_dummy_mode {
|
if state.config.server.danger_dummy_mode {
|
||||||
let identifier = state
|
let identifier = state
|
||||||
.repo
|
.repo
|
||||||
|
@ -65,13 +65,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
||||||
.await?
|
.await?
|
||||||
.ok_or(UploadError::MissingIdentifier)?;
|
.ok_or(UploadError::MissingIdentifier)?;
|
||||||
|
|
||||||
let bytes = state
|
Ok((original_details.clone(), identifier))
|
||||||
.store
|
|
||||||
.to_bytes(&identifier, None, None)
|
|
||||||
.await?
|
|
||||||
.into_bytes();
|
|
||||||
|
|
||||||
Ok((original_details.clone(), bytes))
|
|
||||||
} else {
|
} else {
|
||||||
let process_fut = process(
|
let process_fut = process(
|
||||||
state,
|
state,
|
||||||
|
@ -82,14 +76,14 @@ pub(crate) async fn generate<S: Store + 'static>(
|
||||||
hash.clone(),
|
hash.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (details, bytes) = process_map
|
let (details, identifier) = process_map
|
||||||
.process(hash, thumbnail_path, process_fut)
|
.process(hash, thumbnail_path, process_fut)
|
||||||
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
|
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
|
||||||
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
|
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| UploadError::ProcessTimeout)??;
|
.map_err(|_| UploadError::ProcessTimeout)??;
|
||||||
|
|
||||||
Ok((details, bytes))
|
Ok((details, identifier))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +95,7 @@ async fn process<S: Store + 'static>(
|
||||||
thumbnail_args: Vec<String>,
|
thumbnail_args: Vec<String>,
|
||||||
original_details: &Details,
|
original_details: &Details,
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
) -> Result<(Details, Bytes), Error> {
|
) -> Result<(Details, Arc<str>), Error> {
|
||||||
let guard = MetricsGuard::guard();
|
let guard = MetricsGuard::guard();
|
||||||
let permit = crate::process_semaphore().acquire().await?;
|
let permit = crate::process_semaphore().acquire().await?;
|
||||||
|
|
||||||
|
@ -123,7 +117,7 @@ async fn process<S: Store + 'static>(
|
||||||
|
|
||||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||||
|
|
||||||
let vec = crate::magick::process_image_stream_read(
|
let bytes = crate::magick::process_image_stream_read(
|
||||||
state,
|
state,
|
||||||
stream,
|
stream,
|
||||||
thumbnail_args,
|
thumbnail_args,
|
||||||
|
@ -132,19 +126,19 @@ async fn process<S: Store + 'static>(
|
||||||
quality,
|
quality,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
.into_vec()
|
.into_bytes_stream()
|
||||||
.instrument(tracing::info_span!("Reading processed image to vec"))
|
.instrument(tracing::info_span!(
|
||||||
|
"Reading processed image to BytesStream"
|
||||||
|
))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let bytes = Bytes::from(vec);
|
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
let details = Details::from_bytes(state, bytes.clone()).await?;
|
let details = Details::from_bytes_stream(state, bytes.clone()).await?;
|
||||||
|
|
||||||
let identifier = state
|
let identifier = state
|
||||||
.store
|
.store
|
||||||
.save_bytes(bytes.clone(), details.media_type())
|
.save_stream(bytes.into_io_stream(), details.media_type())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Err(VariantAlreadyExists) = state
|
if let Err(VariantAlreadyExists) = state
|
||||||
|
@ -163,7 +157,7 @@ async fn process<S: Store + 'static>(
|
||||||
|
|
||||||
guard.disarm();
|
guard.disarm();
|
||||||
|
|
||||||
Ok((details, bytes)) as Result<(Details, Bytes), Error>
|
Ok((details, identifier)) as Result<(Details, Arc<str>), Error>
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
|
|
|
@ -14,7 +14,6 @@ use actix_web::web::Bytes;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use reqwest::Body;
|
use reqwest::Body;
|
||||||
|
|
||||||
use streem::IntoStreamer;
|
|
||||||
use tracing::{Instrument, Span};
|
use tracing::{Instrument, Span};
|
||||||
|
|
||||||
mod hasher;
|
mod hasher;
|
||||||
|
@ -29,25 +28,6 @@ pub(crate) struct Session {
|
||||||
identifier: Option<Arc<str>>,
|
identifier: Option<Arc<str>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(stream))]
|
|
||||||
async fn aggregate<S>(stream: S) -> Result<Bytes, Error>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, Error>>,
|
|
||||||
{
|
|
||||||
let mut buf = BytesStream::new();
|
|
||||||
|
|
||||||
let stream = std::pin::pin!(stream);
|
|
||||||
let mut stream = stream.into_streamer();
|
|
||||||
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
tracing::trace!("aggregate: looping");
|
|
||||||
|
|
||||||
buf.add_bytes(res?);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(buf.into_bytes())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_ingest<S>(
|
async fn process_ingest<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||||
|
@ -63,14 +43,17 @@ async fn process_ingest<S>(
|
||||||
where
|
where
|
||||||
S: Store,
|
S: Store,
|
||||||
{
|
{
|
||||||
let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream))
|
let bytes = tokio::time::timeout(
|
||||||
|
Duration::from_secs(60),
|
||||||
|
BytesStream::try_from_stream(stream),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| UploadError::AggregateTimeout)??;
|
.map_err(|_| UploadError::AggregateTimeout)??;
|
||||||
|
|
||||||
let permit = crate::process_semaphore().acquire().await?;
|
let permit = crate::process_semaphore().acquire().await?;
|
||||||
|
|
||||||
tracing::trace!("Validating bytes");
|
tracing::trace!("Validating bytes");
|
||||||
let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?;
|
let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?;
|
||||||
|
|
||||||
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
|
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
|
||||||
if let Some(format) = input_type.processable_format() {
|
if let Some(format) = input_type.processable_format() {
|
||||||
|
@ -116,7 +99,7 @@ where
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
let bytes_stream = state.store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = state.store.to_bytes(&identifier, None, None).await?;
|
||||||
let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?;
|
let details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
|
@ -143,7 +126,7 @@ where
|
||||||
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|
||||||
});
|
});
|
||||||
|
|
||||||
let reader = Box::pin(tokio_util::io::StreamReader::new(stream));
|
let reader = tokio_util::io::StreamReader::new(stream);
|
||||||
|
|
||||||
let hasher_reader = Hasher::new(reader);
|
let hasher_reader = Hasher::new(reader);
|
||||||
let hash_state = hasher_reader.state();
|
let hash_state = hasher_reader.state();
|
||||||
|
|
59
src/lib.rs
59
src/lib.rs
|
@ -83,7 +83,7 @@ use self::{
|
||||||
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
||||||
stream::{empty, once},
|
stream::empty,
|
||||||
tls::Tls,
|
tls::Tls,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ async fn ensure_details_identifier<S: Store + 'static>(
|
||||||
|
|
||||||
tracing::debug!("generating new details from {:?}", identifier);
|
tracing::debug!("generating new details from {:?}", identifier);
|
||||||
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
||||||
let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?;
|
let new_details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||||
tracing::debug!("storing details for {:?}", identifier);
|
tracing::debug!("storing details for {:?}", identifier);
|
||||||
state.repo.relate_details(identifier, &new_details).await?;
|
state.repo.relate_details(identifier, &new_details).await?;
|
||||||
tracing::debug!("stored");
|
tracing::debug!("stored");
|
||||||
|
@ -841,25 +841,18 @@ async fn process<S: Store + 'static>(
|
||||||
.variant_identifier(hash.clone(), path_string)
|
.variant_identifier(hash.clone(), path_string)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(identifier) = identifier_opt {
|
let (details, identifier) = if let Some(identifier) = identifier_opt {
|
||||||
let details = ensure_details_identifier(&state, &identifier).await?;
|
let details = ensure_details_identifier(&state, &identifier).await?;
|
||||||
|
|
||||||
if let Some(public_url) = state.store.public_url(&identifier) {
|
(details, identifier)
|
||||||
return Ok(HttpResponse::SeeOther()
|
} else {
|
||||||
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
|
||||||
.finish());
|
|
||||||
}
|
|
||||||
|
|
||||||
return ranged_file_resp(&state.store, identifier, range, details, not_found).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.config.server.read_only {
|
if state.config.server.read_only {
|
||||||
return Err(UploadError::ReadOnly.into());
|
return Err(UploadError::ReadOnly.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let original_details = ensure_details(&state, &alias).await?;
|
let original_details = ensure_details(&state, &alias).await?;
|
||||||
|
|
||||||
let (details, bytes) = generate::generate(
|
generate::generate(
|
||||||
&state,
|
&state,
|
||||||
&process_map,
|
&process_map,
|
||||||
format,
|
format,
|
||||||
|
@ -868,40 +861,16 @@ async fn process<S: Store + 'static>(
|
||||||
&original_details,
|
&original_details,
|
||||||
hash,
|
hash,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
|
|
||||||
let (builder, stream) = if let Some(web::Header(range_header)) = range {
|
|
||||||
if let Some(range) = range::single_bytes_range(&range_header) {
|
|
||||||
let len = bytes.len() as u64;
|
|
||||||
|
|
||||||
if let Some(content_range) = range::to_content_range(range, len) {
|
|
||||||
let mut builder = HttpResponse::PartialContent();
|
|
||||||
builder.insert_header(content_range);
|
|
||||||
let stream = range::chop_bytes(range, bytes, len)?;
|
|
||||||
|
|
||||||
(builder, Either::left(Either::left(stream)))
|
|
||||||
} else {
|
|
||||||
(
|
|
||||||
HttpResponse::RangeNotSatisfiable(),
|
|
||||||
Either::left(Either::right(empty())),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(UploadError::Range.into());
|
|
||||||
}
|
|
||||||
} else if not_found {
|
|
||||||
(HttpResponse::NotFound(), Either::right(once(Ok(bytes))))
|
|
||||||
} else {
|
|
||||||
(HttpResponse::Ok(), Either::right(once(Ok(bytes))))
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(srv_response(
|
if let Some(public_url) = state.store.public_url(&identifier) {
|
||||||
builder,
|
return Ok(HttpResponse::SeeOther()
|
||||||
stream,
|
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||||
details.media_type(),
|
.finish());
|
||||||
7 * DAYS,
|
}
|
||||||
details.system_time(),
|
|
||||||
))
|
ranged_file_resp(&state.store, identifier, range, details, not_found).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Serving processed image headers", skip(state))]
|
#[tracing::instrument(name = "Serving processed image headers", skip(state))]
|
||||||
|
|
|
@ -396,7 +396,7 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(From::from)
|
.map_err(From::from)
|
||||||
.map_err(MigrateError::Details)?;
|
.map_err(MigrateError::Details)?;
|
||||||
let new_details = Details::from_bytes(to, bytes_stream.into_bytes())
|
let new_details = Details::from_bytes_stream(to, bytes_stream)
|
||||||
.await
|
.await
|
||||||
.map_err(MigrateError::Details)?;
|
.map_err(MigrateError::Details)?;
|
||||||
to.repo
|
to.repo
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use actix_web::web::Bytes;
|
|
||||||
use std::{
|
use std::{
|
||||||
ffi::OsStr,
|
ffi::OsStr,
|
||||||
future::Future,
|
future::Future,
|
||||||
|
@ -6,14 +5,17 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::AsyncReadExt,
|
||||||
process::{Child, ChildStdin, Command},
|
process::{Child, ChildStdin, Command},
|
||||||
};
|
};
|
||||||
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
error_code::ErrorCode,
|
error_code::ErrorCode,
|
||||||
future::{LocalBoxFuture, WithTimeout},
|
future::{LocalBoxFuture, WithTimeout},
|
||||||
read::BoxRead,
|
read::BoxRead,
|
||||||
|
@ -232,12 +234,11 @@ impl Process {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead {
|
pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead {
|
||||||
self.spawn_fn(move |mut stdin| {
|
self.spawn_fn(move |mut stdin| {
|
||||||
let mut input = input;
|
|
||||||
async move {
|
async move {
|
||||||
match stdin.write_all_buf(&mut input).await {
|
match tokio::io::copy(&mut input.into_reader(), &mut stdin).await {
|
||||||
Ok(()) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
// BrokenPipe means we finished reading from Stdout, so we don't need to write
|
// BrokenPipe means we finished reading from Stdout, so we don't need to write
|
||||||
// to stdin. We'll still error out if the command failed so treat this as a
|
// to stdin. We'll still error out if the command failed so treat this as a
|
||||||
// success
|
// success
|
||||||
|
@ -317,6 +318,16 @@ impl ProcessRead {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn into_bytes_stream(self) -> Result<BytesStream, ProcessError> {
|
||||||
|
let cmd = self.command.clone();
|
||||||
|
|
||||||
|
self.with_stdout(move |stdout| {
|
||||||
|
BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64))
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.map_err(move |e| ProcessError::Read(cmd, e))
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn into_vec(self) -> Result<Vec<u8>, ProcessError> {
|
pub(crate) async fn into_vec(self) -> Result<Vec<u8>, ProcessError> {
|
||||||
let cmd = self.command.clone();
|
let cmd = self.command.clone();
|
||||||
|
|
||||||
|
|
15
src/range.rs
15
src/range.rs
|
@ -3,7 +3,6 @@ use std::sync::Arc;
|
||||||
use crate::{
|
use crate::{
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
store::Store,
|
store::Store,
|
||||||
stream::once,
|
|
||||||
};
|
};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range},
|
http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range},
|
||||||
|
@ -11,20 +10,6 @@ use actix_web::{
|
||||||
};
|
};
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
|
|
||||||
pub(crate) fn chop_bytes(
|
|
||||||
byte_range: &ByteRangeSpec,
|
|
||||||
bytes: Bytes,
|
|
||||||
length: u64,
|
|
||||||
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
|
|
||||||
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
|
|
||||||
// END IS INCLUSIVE
|
|
||||||
let end = end as usize + 1;
|
|
||||||
return Ok(once(Ok(bytes.slice(start as usize..end))));
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(UploadError::Range.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn chop_store<S: Store>(
|
pub(crate) async fn chop_store<S: Store>(
|
||||||
byte_range: &ByteRangeSpec,
|
byte_range: &ByteRangeSpec,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
|
|
@ -387,10 +387,9 @@ async fn fetch_or_generate_details<S: Store>(
|
||||||
Ok(details)
|
Ok(details)
|
||||||
} else {
|
} else {
|
||||||
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
||||||
let bytes = bytes_stream.into_bytes();
|
|
||||||
|
|
||||||
let guard = details_semaphore().acquire().await?;
|
let guard = details_semaphore().acquire().await?;
|
||||||
let details = Details::from_bytes(state, bytes).await?;
|
let details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
|
|
||||||
Ok(details)
|
Ok(details)
|
||||||
|
|
34
src/store.rs
34
src/store.rs
|
@ -1,7 +1,6 @@
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
use streem::IntoStreamer;
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream};
|
use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream};
|
||||||
|
@ -92,7 +91,7 @@ pub(crate) trait Store: Clone + Debug {
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static;
|
Reader: AsyncRead;
|
||||||
|
|
||||||
async fn save_stream<S>(
|
async fn save_stream<S>(
|
||||||
&self,
|
&self,
|
||||||
|
@ -100,7 +99,7 @@ pub(crate) trait Store: Clone + Debug {
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
|
S: Stream<Item = std::io::Result<Bytes>>;
|
||||||
|
|
||||||
async fn save_bytes(
|
async fn save_bytes(
|
||||||
&self,
|
&self,
|
||||||
|
@ -123,20 +122,11 @@ pub(crate) trait Store: Clone + Debug {
|
||||||
from_start: Option<u64>,
|
from_start: Option<u64>,
|
||||||
len: Option<u64>,
|
len: Option<u64>,
|
||||||
) -> Result<BytesStream, StoreError> {
|
) -> Result<BytesStream, StoreError> {
|
||||||
let mut buf = BytesStream::new();
|
let stream = self.to_stream(identifier, from_start, len).await?;
|
||||||
|
|
||||||
let mut streamer = self
|
BytesStream::try_from_stream(stream)
|
||||||
.to_stream(identifier, from_start, len)
|
.await
|
||||||
.await?
|
.map_err(StoreError::ReadStream)
|
||||||
.into_streamer();
|
|
||||||
|
|
||||||
while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? {
|
|
||||||
tracing::trace!("to_bytes: looping");
|
|
||||||
|
|
||||||
buf.add_bytes(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(buf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_into<Writer>(
|
async fn read_into<Writer>(
|
||||||
|
@ -166,7 +156,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader, content_type).await
|
T::save_async_read(self, reader, content_type).await
|
||||||
}
|
}
|
||||||
|
@ -177,7 +167,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
{
|
{
|
||||||
T::save_stream(self, stream, content_type).await
|
T::save_stream(self, stream, content_type).await
|
||||||
}
|
}
|
||||||
|
@ -237,7 +227,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader, content_type).await
|
T::save_async_read(self, reader, content_type).await
|
||||||
}
|
}
|
||||||
|
@ -248,7 +238,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
{
|
{
|
||||||
T::save_stream(self, stream, content_type).await
|
T::save_stream(self, stream, content_type).await
|
||||||
}
|
}
|
||||||
|
@ -308,7 +298,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead,
|
||||||
{
|
{
|
||||||
T::save_async_read(self, reader, content_type).await
|
T::save_async_read(self, reader, content_type).await
|
||||||
}
|
}
|
||||||
|
@ -319,7 +309,7 @@ where
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
{
|
{
|
||||||
T::save_stream(self, stream, content_type).await
|
T::save_stream(self, stream, content_type).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,12 +68,14 @@ impl Store for FileStore {
|
||||||
#[tracing::instrument(skip(self, reader))]
|
#[tracing::instrument(skip(self, reader))]
|
||||||
async fn save_async_read<Reader>(
|
async fn save_async_read<Reader>(
|
||||||
&self,
|
&self,
|
||||||
mut reader: Reader,
|
reader: Reader,
|
||||||
_content_type: mime::Mime,
|
_content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead,
|
||||||
{
|
{
|
||||||
|
let mut reader = std::pin::pin!(reader);
|
||||||
|
|
||||||
let path = self.next_file().await?;
|
let path = self.next_file().await?;
|
||||||
|
|
||||||
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
|
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
|
||||||
|
@ -90,7 +92,7 @@ impl Store for FileStore {
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
{
|
{
|
||||||
self.save_async_read(StreamReader::new(stream), content_type)
|
self.save_async_read(StreamReader::new(stream), content_type)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo,
|
bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo,
|
||||||
store::Store, stream::LocalBoxStream,
|
store::Store, stream::LocalBoxStream, sync::DropHandle,
|
||||||
};
|
};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
error::BlockingError,
|
error::BlockingError,
|
||||||
|
@ -170,7 +170,7 @@ fn payload_to_io_error(e: reqwest::Error) -> std::io::Error {
|
||||||
#[tracing::instrument(level = "debug", skip(stream))]
|
#[tracing::instrument(level = "debug", skip(stream))]
|
||||||
async fn read_chunk<S>(stream: &mut S) -> Result<BytesStream, ObjectError>
|
async fn read_chunk<S>(stream: &mut S) -> Result<BytesStream, ObjectError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
||||||
{
|
{
|
||||||
let mut buf = BytesStream::new();
|
let mut buf = BytesStream::new();
|
||||||
|
|
||||||
|
@ -186,6 +186,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"BytesStream with {} chunks, avg length {}",
|
||||||
|
buf.chunks_len(),
|
||||||
|
buf.len() / buf.chunks_len()
|
||||||
|
);
|
||||||
|
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,28 +229,27 @@ impl Store for ObjectStore {
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
Reader: AsyncRead + Unpin + 'static,
|
Reader: AsyncRead,
|
||||||
{
|
{
|
||||||
self.save_stream(ReaderStream::new(reader), content_type)
|
self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn save_stream<S>(
|
async fn save_stream<S>(
|
||||||
&self,
|
&self,
|
||||||
mut stream: S,
|
stream: S,
|
||||||
content_type: mime::Mime,
|
content_type: mime::Mime,
|
||||||
) -> Result<Arc<str>, StoreError>
|
) -> Result<Arc<str>, StoreError>
|
||||||
where
|
where
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
{
|
{
|
||||||
let first_chunk = read_chunk(&mut stream).await?;
|
match self.start_upload(stream, content_type.clone()).await? {
|
||||||
|
UploadState::Single(first_chunk) => {
|
||||||
if first_chunk.len() < CHUNK_SIZE {
|
|
||||||
drop(stream);
|
|
||||||
let (req, object_id) = self
|
let (req, object_id) = self
|
||||||
.put_object_request(first_chunk.len(), content_type)
|
.put_object_request(first_chunk.len(), content_type)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let response = req
|
let response = req
|
||||||
.body(Body::wrap_stream(first_chunk))
|
.body(Body::wrap_stream(first_chunk))
|
||||||
.send()
|
.send()
|
||||||
|
@ -258,94 +263,9 @@ impl Store for ObjectStore {
|
||||||
|
|
||||||
return Ok(object_id);
|
return Ok(object_id);
|
||||||
}
|
}
|
||||||
|
UploadState::Multi(object_id, upload_id, futures) => {
|
||||||
let mut first_chunk = Some(first_chunk);
|
|
||||||
|
|
||||||
let (req, object_id) = self.create_multipart_request(content_type).await?;
|
|
||||||
let response = req
|
|
||||||
.send()
|
|
||||||
.with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST)
|
|
||||||
.await
|
|
||||||
.map_err(ObjectError::from)?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
return Err(status_error(response, None).await);
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = response.text().await.map_err(ObjectError::Request)?;
|
|
||||||
let body = CreateMultipartUpload::parse_response(&body)
|
|
||||||
.map_err(XmlError::new)
|
|
||||||
.map_err(ObjectError::Xml)?;
|
|
||||||
let upload_id = body.upload_id();
|
|
||||||
|
|
||||||
// hack-ish: use async block as Result boundary
|
// hack-ish: use async block as Result boundary
|
||||||
let res = async {
|
let res = async {
|
||||||
let mut complete = false;
|
|
||||||
let mut part_number = 0;
|
|
||||||
let mut futures = Vec::new();
|
|
||||||
|
|
||||||
while !complete {
|
|
||||||
tracing::trace!("save_stream: looping");
|
|
||||||
|
|
||||||
part_number += 1;
|
|
||||||
|
|
||||||
let buf = if let Some(buf) = first_chunk.take() {
|
|
||||||
buf
|
|
||||||
} else {
|
|
||||||
read_chunk(&mut stream).await?
|
|
||||||
};
|
|
||||||
|
|
||||||
complete = buf.len() < CHUNK_SIZE;
|
|
||||||
|
|
||||||
let this = self.clone();
|
|
||||||
|
|
||||||
let object_id2 = object_id.clone();
|
|
||||||
let upload_id2 = upload_id.to_string();
|
|
||||||
let handle = crate::sync::abort_on_drop(crate::sync::spawn(
|
|
||||||
"upload-multipart-part",
|
|
||||||
async move {
|
|
||||||
let response = this
|
|
||||||
.create_upload_part_request(
|
|
||||||
buf.clone(),
|
|
||||||
&object_id2,
|
|
||||||
part_number,
|
|
||||||
&upload_id2,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.body(Body::wrap_stream(buf))
|
|
||||||
.send()
|
|
||||||
.with_metrics(
|
|
||||||
crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(ObjectError::from)?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
return Err(status_error(response, None).await);
|
|
||||||
}
|
|
||||||
|
|
||||||
let etag = response
|
|
||||||
.headers()
|
|
||||||
.get("etag")
|
|
||||||
.ok_or(ObjectError::Etag)?
|
|
||||||
.to_str()
|
|
||||||
.map_err(|_| ObjectError::Etag)?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// early-drop response to close its tracing spans
|
|
||||||
drop(response);
|
|
||||||
|
|
||||||
Ok(etag) as Result<String, StoreError>
|
|
||||||
}
|
|
||||||
.instrument(tracing::Span::current()),
|
|
||||||
));
|
|
||||||
|
|
||||||
futures.push(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
// early-drop stream to allow the next Part to be polled concurrently
|
|
||||||
drop(stream);
|
|
||||||
|
|
||||||
let mut etags = Vec::new();
|
let mut etags = Vec::new();
|
||||||
|
|
||||||
for future in futures {
|
for future in futures {
|
||||||
|
@ -355,7 +275,7 @@ impl Store for ObjectStore {
|
||||||
let response = self
|
let response = self
|
||||||
.send_complete_multipart_request(
|
.send_complete_multipart_request(
|
||||||
&object_id,
|
&object_id,
|
||||||
upload_id,
|
&upload_id,
|
||||||
etags.iter().map(|s| s.as_ref()),
|
etags.iter().map(|s| s.as_ref()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
@ -370,7 +290,7 @@ impl Store for ObjectStore {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
self.create_abort_multipart_request(&object_id, upload_id)
|
self.create_abort_multipart_request(&object_id, &upload_id)
|
||||||
.send()
|
.send()
|
||||||
.with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST)
|
.with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST)
|
||||||
.await
|
.await
|
||||||
|
@ -380,6 +300,8 @@ impl Store for ObjectStore {
|
||||||
|
|
||||||
Ok(object_id)
|
Ok(object_id)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn save_bytes(
|
async fn save_bytes(
|
||||||
|
@ -520,6 +442,15 @@ impl Store for ObjectStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum UploadState {
|
||||||
|
Single(BytesStream),
|
||||||
|
Multi(
|
||||||
|
Arc<str>,
|
||||||
|
String,
|
||||||
|
Vec<DropHandle<Result<String, StoreError>>>,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
impl ObjectStore {
|
impl ObjectStore {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip(access_key, secret_key, session_token, repo))]
|
#[tracing::instrument(skip(access_key, secret_key, session_token, repo))]
|
||||||
|
@ -554,6 +485,128 @@ impl ObjectStore {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn start_upload<S>(
|
||||||
|
&self,
|
||||||
|
stream: S,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<UploadState, StoreError>
|
||||||
|
where
|
||||||
|
S: Stream<Item = std::io::Result<Bytes>>,
|
||||||
|
{
|
||||||
|
let mut stream = std::pin::pin!(stream);
|
||||||
|
|
||||||
|
let first_chunk = read_chunk(&mut stream).await?;
|
||||||
|
|
||||||
|
if first_chunk.len() < CHUNK_SIZE {
|
||||||
|
return Ok(UploadState::Single(first_chunk));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut first_chunk = Some(first_chunk);
|
||||||
|
|
||||||
|
let (req, object_id) = self.create_multipart_request(content_type).await?;
|
||||||
|
let response = req
|
||||||
|
.send()
|
||||||
|
.with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST)
|
||||||
|
.await
|
||||||
|
.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
|
if !response.status().is_success() {
|
||||||
|
return Err(status_error(response, None).await);
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = response.text().await.map_err(ObjectError::Request)?;
|
||||||
|
let body = CreateMultipartUpload::parse_response(&body)
|
||||||
|
.map_err(XmlError::new)
|
||||||
|
.map_err(ObjectError::Xml)?;
|
||||||
|
let upload_id = body.upload_id();
|
||||||
|
|
||||||
|
// hack-ish: use async block as Result boundary
|
||||||
|
let res = async {
|
||||||
|
let mut complete = false;
|
||||||
|
let mut part_number = 0;
|
||||||
|
let mut futures = Vec::new();
|
||||||
|
|
||||||
|
while !complete {
|
||||||
|
tracing::trace!("save_stream: looping");
|
||||||
|
|
||||||
|
part_number += 1;
|
||||||
|
|
||||||
|
let buf = if let Some(buf) = first_chunk.take() {
|
||||||
|
buf
|
||||||
|
} else {
|
||||||
|
read_chunk(&mut stream).await?
|
||||||
|
};
|
||||||
|
|
||||||
|
complete = buf.len() < CHUNK_SIZE;
|
||||||
|
|
||||||
|
let this = self.clone();
|
||||||
|
|
||||||
|
let object_id2 = object_id.clone();
|
||||||
|
let upload_id2 = upload_id.to_string();
|
||||||
|
let handle = crate::sync::abort_on_drop(crate::sync::spawn(
|
||||||
|
"upload-multipart-part",
|
||||||
|
async move {
|
||||||
|
let response = this
|
||||||
|
.create_upload_part_request(
|
||||||
|
buf.clone(),
|
||||||
|
&object_id2,
|
||||||
|
part_number,
|
||||||
|
&upload_id2,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.body(Body::wrap_stream(buf))
|
||||||
|
.send()
|
||||||
|
.with_metrics(
|
||||||
|
crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(ObjectError::from)?;
|
||||||
|
|
||||||
|
if !response.status().is_success() {
|
||||||
|
return Err(status_error(response, None).await);
|
||||||
|
}
|
||||||
|
|
||||||
|
let etag = response
|
||||||
|
.headers()
|
||||||
|
.get("etag")
|
||||||
|
.ok_or(ObjectError::Etag)?
|
||||||
|
.to_str()
|
||||||
|
.map_err(|_| ObjectError::Etag)?
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// early-drop response to close its tracing spans
|
||||||
|
drop(response);
|
||||||
|
|
||||||
|
Ok(etag) as Result<String, StoreError>
|
||||||
|
}
|
||||||
|
.instrument(tracing::Span::current()),
|
||||||
|
));
|
||||||
|
|
||||||
|
futures.push(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(futures)
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(futures) => Ok(UploadState::Multi(
|
||||||
|
object_id,
|
||||||
|
upload_id.to_string(),
|
||||||
|
futures,
|
||||||
|
)),
|
||||||
|
Err(e) => {
|
||||||
|
self.create_abort_multipart_request(&object_id, upload_id)
|
||||||
|
.send()
|
||||||
|
.with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST)
|
||||||
|
.await
|
||||||
|
.map_err(ObjectError::from)?;
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn head_bucket_request(&self) -> Result<RequestBuilder, StoreError> {
|
async fn head_bucket_request(&self) -> Result<RequestBuilder, StoreError> {
|
||||||
let action = self.bucket.head_bucket(Some(&self.credentials));
|
let action = self.bucket.head_bucket(Some(&self.credentials));
|
||||||
|
|
||||||
|
|
|
@ -183,13 +183,6 @@ where
|
||||||
streem::from_fn(|_| std::future::ready(()))
|
streem::from_fn(|_| std::future::ready(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn once<T>(value: T) -> impl Stream<Item = T>
|
|
||||||
where
|
|
||||||
T: 'static,
|
|
||||||
{
|
|
||||||
streem::from_fn(|yielder| yielder.yield_(value))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn timeout<S>(
|
pub(crate) fn timeout<S>(
|
||||||
duration: Duration,
|
duration: Duration,
|
||||||
stream: S,
|
stream: S,
|
||||||
|
|
|
@ -3,6 +3,7 @@ mod ffmpeg;
|
||||||
mod magick;
|
mod magick;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
discover::Discovery,
|
discover::Discovery,
|
||||||
error::Error,
|
error::Error,
|
||||||
error_code::ErrorCode,
|
error_code::ErrorCode,
|
||||||
|
@ -13,7 +14,7 @@ use crate::{
|
||||||
process::ProcessRead,
|
process::ProcessRead,
|
||||||
state::State,
|
state::State,
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum ValidationError {
|
pub(crate) enum ValidationError {
|
||||||
|
@ -56,9 +57,9 @@ impl ValidationError {
|
||||||
const MEGABYTES: usize = 1024 * 1024;
|
const MEGABYTES: usize = 1024 * 1024;
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) async fn validate_bytes<S>(
|
pub(crate) async fn validate_bytes_stream<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||||
if bytes.is_empty() {
|
if bytes.is_empty() {
|
||||||
return Err(ValidationError::Empty.into());
|
return Err(ValidationError::Empty.into());
|
||||||
|
@ -69,7 +70,7 @@ pub(crate) async fn validate_bytes<S>(
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
frames,
|
frames,
|
||||||
} = crate::discover::discover_bytes(state, bytes.clone()).await?;
|
} = crate::discover::discover_bytes_stream(state, bytes.clone()).await?;
|
||||||
|
|
||||||
match &input {
|
match &input {
|
||||||
InputFile::Image(input) => {
|
InputFile::Image(input) => {
|
||||||
|
@ -95,7 +96,7 @@ pub(crate) async fn validate_bytes<S>(
|
||||||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||||
async fn process_image<S>(
|
async fn process_image<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
input: ImageInput,
|
input: ImageInput,
|
||||||
width: u16,
|
width: u16,
|
||||||
height: u16,
|
height: u16,
|
||||||
|
@ -160,7 +161,7 @@ fn validate_animation(
|
||||||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||||
async fn process_animation<S>(
|
async fn process_animation<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
input: AnimationFormat,
|
input: AnimationFormat,
|
||||||
width: u16,
|
width: u16,
|
||||||
height: u16,
|
height: u16,
|
||||||
|
@ -218,7 +219,7 @@ fn validate_video(
|
||||||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||||
async fn process_video<S>(
|
async fn process_video<S>(
|
||||||
state: &State<S>,
|
state: &State<S>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
input: InputVideoFormat,
|
input: InputVideoFormat,
|
||||||
width: u16,
|
width: u16,
|
||||||
height: u16,
|
height: u16,
|
||||||
|
|
|
@ -1,14 +1,18 @@
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
exiftool::ExifError,
|
exiftool::ExifError,
|
||||||
process::{Process, ProcessRead},
|
process::{Process, ProcessRead},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all)]
|
#[tracing::instrument(level = "trace", skip_all)]
|
||||||
pub(crate) fn clear_metadata_bytes_read(
|
pub(crate) fn clear_metadata_bytes_read(
|
||||||
input: Bytes,
|
input: BytesStream,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<ProcessRead, ExifError> {
|
) -> Result<ProcessRead, ExifError> {
|
||||||
Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input))
|
Ok(
|
||||||
|
Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?
|
||||||
|
.bytes_stream_read(input),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
use std::{ffi::OsStr, sync::Arc};
|
use std::{ffi::OsStr, sync::Arc};
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
ffmpeg::FfMpegError,
|
ffmpeg::FfMpegError,
|
||||||
formats::{InputVideoFormat, OutputVideo},
|
formats::{InputVideoFormat, OutputVideo},
|
||||||
process::{Process, ProcessRead},
|
process::{Process, ProcessRead},
|
||||||
|
@ -16,7 +17,7 @@ pub(super) async fn transcode_bytes(
|
||||||
output_format: OutputVideo,
|
output_format: OutputVideo,
|
||||||
crf: u8,
|
crf: u8,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<ProcessRead, FfMpegError> {
|
) -> Result<ProcessRead, FfMpegError> {
|
||||||
let input_file = tmp_dir.tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
|
@ -27,7 +28,7 @@ pub(super) async fn transcode_bytes(
|
||||||
.await
|
.await
|
||||||
.map_err(FfMpegError::CreateFile)?;
|
.map_err(FfMpegError::CreateFile)?;
|
||||||
tmp_one
|
tmp_one
|
||||||
.write_from_bytes(bytes)
|
.write_from_stream(bytes.into_io_stream())
|
||||||
.await
|
.await
|
||||||
.map_err(FfMpegError::Write)?;
|
.map_err(FfMpegError::Write)?;
|
||||||
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
|
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
bytes_stream::BytesStream,
|
||||||
formats::{AnimationFormat, ImageFormat},
|
formats::{AnimationFormat, ImageFormat},
|
||||||
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||||
process::{Process, ProcessRead},
|
process::{Process, ProcessRead},
|
||||||
|
@ -14,7 +15,7 @@ pub(super) async fn convert_image<S>(
|
||||||
input: ImageFormat,
|
input: ImageFormat,
|
||||||
output: ImageFormat,
|
output: ImageFormat,
|
||||||
quality: Option<u8>,
|
quality: Option<u8>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<ProcessRead, MagickError> {
|
) -> Result<ProcessRead, MagickError> {
|
||||||
convert(
|
convert(
|
||||||
state,
|
state,
|
||||||
|
@ -32,7 +33,7 @@ pub(super) async fn convert_animation<S>(
|
||||||
input: AnimationFormat,
|
input: AnimationFormat,
|
||||||
output: AnimationFormat,
|
output: AnimationFormat,
|
||||||
quality: Option<u8>,
|
quality: Option<u8>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<ProcessRead, MagickError> {
|
) -> Result<ProcessRead, MagickError> {
|
||||||
convert(
|
convert(
|
||||||
state,
|
state,
|
||||||
|
@ -51,7 +52,7 @@ async fn convert<S>(
|
||||||
output: &'static str,
|
output: &'static str,
|
||||||
coalesce: bool,
|
coalesce: bool,
|
||||||
quality: Option<u8>,
|
quality: Option<u8>,
|
||||||
bytes: Bytes,
|
bytes: BytesStream,
|
||||||
) -> Result<ProcessRead, MagickError> {
|
) -> Result<ProcessRead, MagickError> {
|
||||||
let temporary_path = state
|
let temporary_path = state
|
||||||
.tmp_dir
|
.tmp_dir
|
||||||
|
@ -69,7 +70,7 @@ async fn convert<S>(
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::CreateFile)?;
|
.map_err(MagickError::CreateFile)?;
|
||||||
tmp_one
|
tmp_one
|
||||||
.write_from_bytes(bytes)
|
.write_from_stream(bytes.into_io_stream())
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::Write)?;
|
.map_err(MagickError::Write)?;
|
||||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
||||||
|
|
Loading…
Reference in a new issue