mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-11-20 11:21:14 +00:00
Use BytesStream in more places
This commit is contained in:
parent
c722cdd905
commit
c1e651c01a
20 changed files with 206 additions and 151 deletions
|
@ -9,6 +9,8 @@ use std::{
|
|||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use streem::IntoStreamer;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct BytesStream {
|
||||
|
@ -24,6 +26,21 @@ impl BytesStream {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>>,
|
||||
{
|
||||
let stream = std::pin::pin!(stream);
|
||||
let mut stream = stream.into_streamer();
|
||||
let mut bs = Self::new();
|
||||
|
||||
while let Some(bytes) = stream.try_next().await? {
|
||||
bs.add_bytes(bytes);
|
||||
}
|
||||
|
||||
Ok(bs)
|
||||
}
|
||||
|
||||
pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
|
||||
self.total_len += bytes.len();
|
||||
self.inner.push_back(bytes);
|
||||
|
@ -33,7 +50,15 @@ impl BytesStream {
|
|||
self.total_len
|
||||
}
|
||||
|
||||
pub(crate) fn into_bytes(self) -> Bytes {
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.total_len > 0
|
||||
}
|
||||
|
||||
fn into_bytes(mut self) -> Bytes {
|
||||
if self.inner.len() == 1 {
|
||||
return self.inner.pop_front().expect("Exactly one");
|
||||
}
|
||||
|
||||
let mut buf = BytesMut::with_capacity(self.total_len);
|
||||
|
||||
for bytes in self.inner {
|
||||
|
@ -42,6 +67,26 @@ impl BytesStream {
|
|||
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
pub(crate) fn into_reader(self) -> BytesReader {
|
||||
BytesReader {
|
||||
index: 0,
|
||||
inner: self.inner,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_io_stream(self) -> IoStream {
|
||||
IoStream { stream: self }
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct IoStream {
|
||||
stream: BytesStream,
|
||||
}
|
||||
|
||||
pub(crate) struct BytesReader {
|
||||
index: usize,
|
||||
inner: VecDeque<Bytes>,
|
||||
}
|
||||
|
||||
impl IntoIterator for BytesStream {
|
||||
|
@ -86,3 +131,47 @@ impl Stream for BytesStream {
|
|||
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for IoStream {
|
||||
type Item = std::io::Result<Bytes>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
MessageBody::poll_next(Pin::new(&mut self.get_mut().stream), cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for BytesReader {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
while buf.remaining() > 0 {
|
||||
if self.index == self.inner[0].len() {
|
||||
self.inner.pop_front();
|
||||
self.index = 0;
|
||||
}
|
||||
|
||||
if self.inner.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let upper_bound = (self.index + buf.remaining()).min(self.inner[0].len());
|
||||
|
||||
let slice = &self.inner[0][self.index..upper_bound];
|
||||
|
||||
buf.put_slice(slice);
|
||||
self.index += slice.len();
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Bytes> for BytesStream {
|
||||
fn from(value: Bytes) -> Self {
|
||||
let mut bs = BytesStream::new();
|
||||
bs.add_bytes(value);
|
||||
bs
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ use std::{
|
|||
};
|
||||
use tracing::Span;
|
||||
|
||||
type OutcomeReceiver = Receiver<(Details, web::Bytes)>;
|
||||
type OutcomeReceiver = Receiver<(Details, Arc<str>)>;
|
||||
|
||||
type ProcessMapKey = (Hash, PathBuf);
|
||||
|
||||
|
@ -36,9 +36,9 @@ impl ProcessMap {
|
|||
hash: Hash,
|
||||
path: PathBuf,
|
||||
fut: Fut,
|
||||
) -> Result<(Details, web::Bytes), Error>
|
||||
) -> Result<(Details, Arc<str>), Error>
|
||||
where
|
||||
Fut: Future<Output = Result<(Details, web::Bytes), Error>>,
|
||||
Fut: Future<Output = Result<(Details, Arc<str>), Error>>,
|
||||
{
|
||||
let key = (hash.clone(), path.clone());
|
||||
|
||||
|
@ -100,10 +100,10 @@ struct CancelToken {
|
|||
|
||||
enum CancelState {
|
||||
Sender {
|
||||
sender: Sender<(Details, web::Bytes)>,
|
||||
sender: Sender<(Details, Arc<str>)>,
|
||||
},
|
||||
Receiver {
|
||||
receiver: RecvFut<'static, (Details, web::Bytes)>,
|
||||
receiver: RecvFut<'static, (Details, Arc<str>)>,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -124,9 +124,9 @@ pin_project_lite::pin_project! {
|
|||
|
||||
impl<F> Future for CancelSafeProcessor<F>
|
||||
where
|
||||
F: Future<Output = Result<(Details, web::Bytes), Error>>,
|
||||
F: Future<Output = Result<(Details, Arc<str>), Error>>,
|
||||
{
|
||||
type Output = Result<(Details, web::Bytes), Error>;
|
||||
type Output = Result<(Details, Arc<str>), Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
discover::Discovery,
|
||||
error::Error,
|
||||
formats::{InternalFormat, InternalVideoFormat},
|
||||
|
@ -80,13 +81,16 @@ impl Details {
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub(crate) async fn from_bytes<S>(state: &State<S>, input: web::Bytes) -> Result<Self, Error> {
|
||||
pub(crate) async fn from_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
input: BytesStream,
|
||||
) -> Result<Self, Error> {
|
||||
let Discovery {
|
||||
input,
|
||||
width,
|
||||
height,
|
||||
frames,
|
||||
} = crate::discover::discover_bytes(state, input).await?;
|
||||
} = crate::discover::discover_bytes_stream(state, input).await?;
|
||||
|
||||
Ok(Details::from_parts(
|
||||
input.internal_format(),
|
||||
|
|
|
@ -4,7 +4,7 @@ mod magick;
|
|||
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{formats::InputFile, state::State};
|
||||
use crate::{bytes_stream::BytesStream, formats::InputFile, state::State};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub(crate) struct Discovery {
|
||||
|
@ -27,13 +27,13 @@ pub(crate) enum DiscoverError {
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn discover_bytes<S>(
|
||||
pub(crate) async fn discover_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<Discovery, crate::error::Error> {
|
||||
let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?;
|
||||
let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?;
|
||||
|
||||
let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?;
|
||||
let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?;
|
||||
|
||||
let discovery =
|
||||
exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
exiftool::ExifError,
|
||||
formats::{ImageInput, InputFile},
|
||||
process::Process,
|
||||
|
@ -16,7 +17,7 @@ pub(super) async fn check_reorient(
|
|||
height,
|
||||
frames,
|
||||
}: Discovery,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
timeout: u64,
|
||||
) -> Result<Discovery, ExifError> {
|
||||
let input = match input {
|
||||
|
@ -40,9 +41,9 @@ pub(super) async fn check_reorient(
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
|
||||
async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> {
|
||||
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
||||
.bytes_read(input)
|
||||
.bytes_stream_read(input)
|
||||
.into_string()
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ mod tests;
|
|||
use std::{collections::HashSet, sync::OnceLock};
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
ffmpeg::FfMpegError,
|
||||
formats::{
|
||||
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
|
||||
|
@ -158,15 +159,15 @@ struct Flags {
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn discover_bytes<S>(
|
||||
pub(super) async fn discover_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<Option<Discovery>, FfMpegError> {
|
||||
discover_file(state, move |mut file| {
|
||||
let bytes = bytes.clone();
|
||||
|
||||
async move {
|
||||
file.write_from_bytes(bytes)
|
||||
file.write_from_stream(bytes.into_io_stream())
|
||||
.await
|
||||
.map_err(FfMpegError::Write)?;
|
||||
Ok(file)
|
||||
|
|
|
@ -4,6 +4,7 @@ mod tests;
|
|||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
discover::DiscoverError,
|
||||
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
||||
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
|
@ -31,10 +32,10 @@ struct Geometry {
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn confirm_bytes<S>(
|
||||
pub(super) async fn confirm_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
discovery: Option<Discovery>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<Discovery, MagickError> {
|
||||
match discovery {
|
||||
Some(Discovery {
|
||||
|
@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes<S>(
|
|||
}
|
||||
|
||||
discover_file(state, move |mut file| async move {
|
||||
file.write_from_bytes(bytes)
|
||||
file.write_from_stream(bytes.into_io_stream())
|
||||
.await
|
||||
.map_err(MagickError::Write)?;
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
error_code::ErrorCode,
|
||||
process::{Process, ProcessError, ProcessRead},
|
||||
};
|
||||
|
@ -39,9 +40,9 @@ impl ExifError {
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(input))]
|
||||
pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> {
|
||||
pub(crate) async fn needs_reorienting(timeout: u64, input: BytesStream) -> Result<bool, ExifError> {
|
||||
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
|
||||
.bytes_read(input)
|
||||
.bytes_stream_read(input)
|
||||
.into_string()
|
||||
.await?;
|
||||
|
||||
|
@ -51,9 +52,9 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool
|
|||
#[tracing::instrument(level = "trace", skip(input))]
|
||||
pub(crate) fn clear_metadata_bytes_read(
|
||||
timeout: u64,
|
||||
input: Bytes,
|
||||
input: BytesStream,
|
||||
) -> Result<ProcessRead, ExifError> {
|
||||
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?;
|
||||
|
||||
Ok(process.bytes_read(input))
|
||||
Ok(process.bytes_stream_read(input))
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ mod ffmpeg;
|
|||
mod magick;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
concurrent_processor::ProcessMap,
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
|
@ -57,7 +58,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
) -> Result<(Details, Arc<str>), Error> {
|
||||
if state.config.server.danger_dummy_mode {
|
||||
let identifier = state
|
||||
.repo
|
||||
|
@ -65,13 +66,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
.await?
|
||||
.ok_or(UploadError::MissingIdentifier)?;
|
||||
|
||||
let bytes = state
|
||||
.store
|
||||
.to_bytes(&identifier, None, None)
|
||||
.await?
|
||||
.into_bytes();
|
||||
|
||||
Ok((original_details.clone(), bytes))
|
||||
Ok((original_details.clone(), identifier))
|
||||
} else {
|
||||
let process_fut = process(
|
||||
state,
|
||||
|
@ -82,14 +77,14 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
hash.clone(),
|
||||
);
|
||||
|
||||
let (details, bytes) = process_map
|
||||
let (details, identifier) = process_map
|
||||
.process(hash, thumbnail_path, process_fut)
|
||||
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
|
||||
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
|
||||
.await
|
||||
.map_err(|_| UploadError::ProcessTimeout)??;
|
||||
|
||||
Ok((details, bytes))
|
||||
Ok((details, identifier))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,7 +96,7 @@ async fn process<S: Store + 'static>(
|
|||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
) -> Result<(Details, Arc<str>), Error> {
|
||||
let guard = MetricsGuard::guard();
|
||||
let permit = crate::process_semaphore().acquire().await?;
|
||||
|
||||
|
@ -123,7 +118,7 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let vec = crate::magick::process_image_stream_read(
|
||||
let bytes = crate::magick::process_image_stream_read(
|
||||
state,
|
||||
stream,
|
||||
thumbnail_args,
|
||||
|
@ -132,19 +127,17 @@ async fn process<S: Store + 'static>(
|
|||
quality,
|
||||
)
|
||||
.await?
|
||||
.into_vec()
|
||||
.into_bytes_stream()
|
||||
.instrument(tracing::info_span!("Reading processed image to vec"))
|
||||
.await?;
|
||||
|
||||
let bytes = Bytes::from(vec);
|
||||
|
||||
drop(permit);
|
||||
|
||||
let details = Details::from_bytes(state, bytes.clone()).await?;
|
||||
let details = Details::from_bytes_stream(state, bytes.clone()).await?;
|
||||
|
||||
let identifier = state
|
||||
.store
|
||||
.save_bytes(bytes.clone(), details.media_type())
|
||||
.save_stream(bytes.into_io_stream(), details.media_type())
|
||||
.await?;
|
||||
|
||||
if let Err(VariantAlreadyExists) = state
|
||||
|
@ -163,7 +156,7 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
guard.disarm();
|
||||
|
||||
Ok((details, bytes)) as Result<(Details, Bytes), Error>
|
||||
Ok((details, identifier)) as Result<(Details, Arc<str>), Error>
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
|
|
@ -30,7 +30,7 @@ pub(crate) struct Session {
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip(stream))]
|
||||
async fn aggregate<S>(stream: S) -> Result<Bytes, Error>
|
||||
async fn aggregate<S>(stream: S) -> Result<BytesStream, Error>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, Error>>,
|
||||
{
|
||||
|
@ -45,7 +45,7 @@ where
|
|||
buf.add_bytes(res?);
|
||||
}
|
||||
|
||||
Ok(buf.into_bytes())
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
async fn process_ingest<S>(
|
||||
|
@ -70,7 +70,7 @@ where
|
|||
let permit = crate::process_semaphore().acquire().await?;
|
||||
|
||||
tracing::trace!("Validating bytes");
|
||||
let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?;
|
||||
let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?;
|
||||
|
||||
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
|
||||
if let Some(format) = input_type.processable_format() {
|
||||
|
@ -116,7 +116,7 @@ where
|
|||
.await??;
|
||||
|
||||
let bytes_stream = state.store.to_bytes(&identifier, None, None).await?;
|
||||
let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?;
|
||||
let details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||
|
||||
drop(permit);
|
||||
|
||||
|
|
59
src/lib.rs
59
src/lib.rs
|
@ -83,7 +83,7 @@ use self::{
|
|||
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||
serde_str::Serde,
|
||||
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
||||
stream::{empty, once},
|
||||
stream::empty,
|
||||
tls::Tls,
|
||||
};
|
||||
|
||||
|
@ -141,7 +141,7 @@ async fn ensure_details_identifier<S: Store + 'static>(
|
|||
|
||||
tracing::debug!("generating new details from {:?}", identifier);
|
||||
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
||||
let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?;
|
||||
let new_details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||
tracing::debug!("storing details for {:?}", identifier);
|
||||
state.repo.relate_details(identifier, &new_details).await?;
|
||||
tracing::debug!("stored");
|
||||
|
@ -841,25 +841,18 @@ async fn process<S: Store + 'static>(
|
|||
.variant_identifier(hash.clone(), path_string)
|
||||
.await?;
|
||||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let (details, identifier) = if let Some(identifier) = identifier_opt {
|
||||
let details = ensure_details_identifier(&state, &identifier).await?;
|
||||
|
||||
if let Some(public_url) = state.store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||
.finish());
|
||||
}
|
||||
|
||||
return ranged_file_resp(&state.store, identifier, range, details, not_found).await;
|
||||
}
|
||||
|
||||
(details, identifier)
|
||||
} else {
|
||||
if state.config.server.read_only {
|
||||
return Err(UploadError::ReadOnly.into());
|
||||
}
|
||||
|
||||
let original_details = ensure_details(&state, &alias).await?;
|
||||
|
||||
let (details, bytes) = generate::generate(
|
||||
generate::generate(
|
||||
&state,
|
||||
&process_map,
|
||||
format,
|
||||
|
@ -868,40 +861,16 @@ async fn process<S: Store + 'static>(
|
|||
&original_details,
|
||||
hash,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (builder, stream) = if let Some(web::Header(range_header)) = range {
|
||||
if let Some(range) = range::single_bytes_range(&range_header) {
|
||||
let len = bytes.len() as u64;
|
||||
|
||||
if let Some(content_range) = range::to_content_range(range, len) {
|
||||
let mut builder = HttpResponse::PartialContent();
|
||||
builder.insert_header(content_range);
|
||||
let stream = range::chop_bytes(range, bytes, len)?;
|
||||
|
||||
(builder, Either::left(Either::left(stream)))
|
||||
} else {
|
||||
(
|
||||
HttpResponse::RangeNotSatisfiable(),
|
||||
Either::left(Either::right(empty())),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
return Err(UploadError::Range.into());
|
||||
}
|
||||
} else if not_found {
|
||||
(HttpResponse::NotFound(), Either::right(once(Ok(bytes))))
|
||||
} else {
|
||||
(HttpResponse::Ok(), Either::right(once(Ok(bytes))))
|
||||
.await?
|
||||
};
|
||||
|
||||
Ok(srv_response(
|
||||
builder,
|
||||
stream,
|
||||
details.media_type(),
|
||||
7 * DAYS,
|
||||
details.system_time(),
|
||||
))
|
||||
if let Some(public_url) = state.store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
.insert_header((actix_web::http::header::LOCATION, public_url.as_str()))
|
||||
.finish());
|
||||
}
|
||||
|
||||
ranged_file_resp(&state.store, identifier, range, details, not_found).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Serving processed image headers", skip(state))]
|
||||
|
|
|
@ -396,7 +396,7 @@ where
|
|||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(MigrateError::Details)?;
|
||||
let new_details = Details::from_bytes(to, bytes_stream.into_bytes())
|
||||
let new_details = Details::from_bytes_stream(to, bytes_stream)
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
to.repo
|
||||
|
|
|
@ -6,14 +6,17 @@ use std::{
|
|||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use streem::IntoStreamer;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
process::{Child, ChildStdin, Command},
|
||||
};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::Instrument;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
error_code::ErrorCode,
|
||||
future::{LocalBoxFuture, WithTimeout},
|
||||
read::BoxRead,
|
||||
|
@ -232,12 +235,11 @@ impl Process {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead {
|
||||
pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead {
|
||||
self.spawn_fn(move |mut stdin| {
|
||||
let mut input = input;
|
||||
async move {
|
||||
match stdin.write_all_buf(&mut input).await {
|
||||
Ok(()) => Ok(()),
|
||||
match tokio::io::copy(&mut input.into_reader(), &mut stdin).await {
|
||||
Ok(_) => Ok(()),
|
||||
// BrokenPipe means we finished reading from Stdout, so we don't need to write
|
||||
// to stdin. We'll still error out if the command failed so treat this as a
|
||||
// success
|
||||
|
@ -317,6 +319,16 @@ impl ProcessRead {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn into_bytes_stream(self) -> Result<BytesStream, ProcessError> {
|
||||
let cmd = self.command.clone();
|
||||
|
||||
self.with_stdout(move |stdout| {
|
||||
BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16))
|
||||
})
|
||||
.await?
|
||||
.map_err(move |e| ProcessError::Read(cmd, e))
|
||||
}
|
||||
|
||||
pub(crate) async fn into_vec(self) -> Result<Vec<u8>, ProcessError> {
|
||||
let cmd = self.command.clone();
|
||||
|
||||
|
|
15
src/range.rs
15
src/range.rs
|
@ -3,7 +3,6 @@ use std::sync::Arc;
|
|||
use crate::{
|
||||
error::{Error, UploadError},
|
||||
store::Store,
|
||||
stream::once,
|
||||
};
|
||||
use actix_web::{
|
||||
http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range},
|
||||
|
@ -11,20 +10,6 @@ use actix_web::{
|
|||
};
|
||||
use futures_core::Stream;
|
||||
|
||||
pub(crate) fn chop_bytes(
|
||||
byte_range: &ByteRangeSpec,
|
||||
bytes: Bytes,
|
||||
length: u64,
|
||||
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
|
||||
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
|
||||
// END IS INCLUSIVE
|
||||
let end = end as usize + 1;
|
||||
return Ok(once(Ok(bytes.slice(start as usize..end))));
|
||||
}
|
||||
|
||||
Err(UploadError::Range.into())
|
||||
}
|
||||
|
||||
pub(crate) async fn chop_store<S: Store>(
|
||||
byte_range: &ByteRangeSpec,
|
||||
store: &S,
|
||||
|
|
|
@ -387,10 +387,9 @@ async fn fetch_or_generate_details<S: Store>(
|
|||
Ok(details)
|
||||
} else {
|
||||
let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
|
||||
let bytes = bytes_stream.into_bytes();
|
||||
|
||||
let guard = details_semaphore().acquire().await?;
|
||||
let details = Details::from_bytes(state, bytes).await?;
|
||||
let details = Details::from_bytes_stream(state, bytes_stream).await?;
|
||||
drop(guard);
|
||||
|
||||
Ok(details)
|
||||
|
|
|
@ -183,13 +183,6 @@ where
|
|||
streem::from_fn(|_| std::future::ready(()))
|
||||
}
|
||||
|
||||
pub(crate) fn once<T>(value: T) -> impl Stream<Item = T>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
streem::from_fn(|yielder| yielder.yield_(value))
|
||||
}
|
||||
|
||||
pub(crate) fn timeout<S>(
|
||||
duration: Duration,
|
||||
stream: S,
|
||||
|
|
|
@ -3,6 +3,7 @@ mod ffmpeg;
|
|||
mod magick;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
discover::Discovery,
|
||||
error::Error,
|
||||
error_code::ErrorCode,
|
||||
|
@ -56,9 +57,9 @@ impl ValidationError {
|
|||
const MEGABYTES: usize = 1024 * 1024;
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn validate_bytes<S>(
|
||||
pub(crate) async fn validate_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||
if bytes.is_empty() {
|
||||
return Err(ValidationError::Empty.into());
|
||||
|
@ -69,7 +70,7 @@ pub(crate) async fn validate_bytes<S>(
|
|||
width,
|
||||
height,
|
||||
frames,
|
||||
} = crate::discover::discover_bytes(state, bytes.clone()).await?;
|
||||
} = crate::discover::discover_bytes_stream(state, bytes.clone()).await?;
|
||||
|
||||
match &input {
|
||||
InputFile::Image(input) => {
|
||||
|
@ -95,7 +96,7 @@ pub(crate) async fn validate_bytes<S>(
|
|||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||
async fn process_image<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
input: ImageInput,
|
||||
width: u16,
|
||||
height: u16,
|
||||
|
@ -160,7 +161,7 @@ fn validate_animation(
|
|||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||
async fn process_animation<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
input: AnimationFormat,
|
||||
width: u16,
|
||||
height: u16,
|
||||
|
@ -218,7 +219,7 @@ fn validate_video(
|
|||
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
|
||||
async fn process_video<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
input: InputVideoFormat,
|
||||
width: u16,
|
||||
height: u16,
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
exiftool::ExifError,
|
||||
process::{Process, ProcessRead},
|
||||
};
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
pub(crate) fn clear_metadata_bytes_read(
|
||||
input: Bytes,
|
||||
input: BytesStream,
|
||||
timeout: u64,
|
||||
) -> Result<ProcessRead, ExifError> {
|
||||
Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input))
|
||||
Ok(
|
||||
Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?
|
||||
.bytes_stream_read(input),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ use actix_web::web::Bytes;
|
|||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
ffmpeg::FfMpegError,
|
||||
formats::{InputVideoFormat, OutputVideo},
|
||||
process::{Process, ProcessRead},
|
||||
|
@ -16,7 +17,7 @@ pub(super) async fn transcode_bytes(
|
|||
output_format: OutputVideo,
|
||||
crf: u8,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<ProcessRead, FfMpegError> {
|
||||
let input_file = tmp_dir.tmp_file(None);
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
|
@ -27,7 +28,7 @@ pub(super) async fn transcode_bytes(
|
|||
.await
|
||||
.map_err(FfMpegError::CreateFile)?;
|
||||
tmp_one
|
||||
.write_from_bytes(bytes)
|
||||
.write_from_stream(bytes.into_io_stream())
|
||||
.await
|
||||
.map_err(FfMpegError::Write)?;
|
||||
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::ffi::OsStr;
|
|||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
formats::{AnimationFormat, ImageFormat},
|
||||
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
process::{Process, ProcessRead},
|
||||
|
@ -14,7 +15,7 @@ pub(super) async fn convert_image<S>(
|
|||
input: ImageFormat,
|
||||
output: ImageFormat,
|
||||
quality: Option<u8>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
state,
|
||||
|
@ -32,7 +33,7 @@ pub(super) async fn convert_animation<S>(
|
|||
input: AnimationFormat,
|
||||
output: AnimationFormat,
|
||||
quality: Option<u8>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
state,
|
||||
|
@ -51,7 +52,7 @@ async fn convert<S>(
|
|||
output: &'static str,
|
||||
coalesce: bool,
|
||||
quality: Option<u8>,
|
||||
bytes: Bytes,
|
||||
bytes: BytesStream,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
let temporary_path = state
|
||||
.tmp_dir
|
||||
|
@ -69,7 +70,7 @@ async fn convert<S>(
|
|||
.await
|
||||
.map_err(MagickError::CreateFile)?;
|
||||
tmp_one
|
||||
.write_from_bytes(bytes)
|
||||
.write_from_stream(bytes.into_io_stream())
|
||||
.await
|
||||
.map_err(MagickError::Write)?;
|
||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
||||
|
|
Loading…
Reference in a new issue