diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index 7c79728..5d37419 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -29,14 +29,14 @@ services: # - PICTRS_PROXY_UPSTREAM=http://pictrs:8080 # - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317 - minio: - image: quay.io/minio/minio - command: server /mnt --console-address ":9001" - ports: - - "9000:9000" - - "9001:9001" - volumes: - - ./storage/minio:/mnt + # minio: + # image: quay.io/minio/minio + # command: server /mnt --console-address ":9001" + # ports: + # - "9000:9000" + # - "9001:9001" + # volumes: + # - ./storage/minio:/mnt garage: image: dxflrs/garage:v0.9.0 diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 37f8677..379e088 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -54,9 +54,9 @@ impl Backgrounded { { self.upload_id = Some(self.repo.create_upload().await?); - let stream = Box::pin(crate::stream::map_err(stream, |e| { + let stream = crate::stream::map_err(stream, |e| { std::io::Error::new(std::io::ErrorKind::Other, e) - })); + }); // use octet-stream, we don't know the upload's real type yet let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?; diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index ae07007..df1ba8c 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -1,7 +1,4 @@ -use actix_web::{ - body::MessageBody, - web::{Bytes, BytesMut}, -}; +use actix_web::web::Bytes; use futures_core::Stream; use std::{ collections::{vec_deque::IntoIter, VecDeque}, @@ -9,6 +6,9 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use streem::IntoStreamer; +use tokio::io::AsyncRead; +use tokio_util::bytes::Buf; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -24,6 +24,33 @@ impl BytesStream { } } + #[tracing::instrument(skip(stream))] + pub(crate) async fn try_from_stream(stream: S) -> Result + where + S: Stream>, + { + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); + let mut bs = Self::new(); + + while let Some(bytes) = stream.try_next().await? { + tracing::trace!("try_from_stream: looping"); + bs.add_bytes(bytes); + } + + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + bs.chunks_len(), + bs.len() / bs.chunks_len() + ); + + Ok(bs) + } + + pub(crate) fn chunks_len(&self) -> usize { + self.inner.len() + } + pub(crate) fn add_bytes(&mut self, bytes: Bytes) { self.total_len += bytes.len(); self.inner.push_back(bytes); @@ -33,15 +60,25 @@ impl BytesStream { self.total_len } - pub(crate) fn into_bytes(self) -> Bytes { - let mut buf = BytesMut::with_capacity(self.total_len); - - for bytes in self.inner { - buf.extend_from_slice(&bytes); - } - - buf.freeze() + pub(crate) fn is_empty(&self) -> bool { + self.total_len == 0 } + + pub(crate) fn into_reader(self) -> BytesReader { + BytesReader { inner: self.inner } + } + + pub(crate) fn into_io_stream(self) -> IoStream { + IoStream { inner: self.inner } + } +} + +pub(crate) struct IoStream { + inner: VecDeque, +} + +pub(crate) struct BytesReader { + inner: VecDeque, } impl IntoIterator for BytesStream { @@ -53,36 +90,56 @@ impl IntoIterator for BytesStream { } } -impl MessageBody for BytesStream { - type Error = std::io::Error; - - fn size(&self) -> actix_web::body::BodySize { - if let Ok(len) = self.len().try_into() { - actix_web::body::BodySize::Sized(len) - } else { - actix_web::body::BodySize::None - } - } - - fn poll_next( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) - } - - fn try_into_bytes(self) -> Result - where - Self: Sized, - { - Ok(self.into_bytes()) - } -} - impl Stream for BytesStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) } + + fn size_hint(&self) -> (usize, Option) { + (self.inner.len(), Some(self.inner.len())) + } +} + +impl Stream for IoStream { + type Item = std::io::Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + (self.inner.len(), Some(self.inner.len())) + } +} + +impl AsyncRead for BytesReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + while buf.remaining() > 0 { + tracing::trace!("bytes reader: looping"); + + if let Some(bytes) = self.inner.front_mut() { + if bytes.is_empty() { + self.inner.pop_front(); + continue; + } + + let upper_bound = buf.remaining().min(bytes.len()); + + let slice = &bytes[..upper_bound]; + + buf.put_slice(slice); + bytes.advance(upper_bound); + } else { + break; + } + } + + Poll::Ready(Ok(())) + } } diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 4a0f9cd..b15b521 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, repo::Hash, }; -use actix_web::web; + use dashmap::{mapref::entry::Entry, DashMap}; use flume::{r#async::RecvFut, Receiver, Sender}; use std::{ @@ -15,7 +15,7 @@ use std::{ }; use tracing::Span; -type OutcomeReceiver = Receiver<(Details, web::Bytes)>; +type OutcomeReceiver = Receiver<(Details, Arc)>; type ProcessMapKey = (Hash, PathBuf); @@ -36,9 +36,9 @@ impl ProcessMap { hash: Hash, path: PathBuf, fut: Fut, - ) -> Result<(Details, web::Bytes), Error> + ) -> Result<(Details, Arc), Error> where - Fut: Future>, + Fut: Future), Error>>, { let key = (hash.clone(), path.clone()); @@ -100,10 +100,10 @@ struct CancelToken { enum CancelState { Sender { - sender: Sender<(Details, web::Bytes)>, + sender: Sender<(Details, Arc)>, }, Receiver { - receiver: RecvFut<'static, (Details, web::Bytes)>, + receiver: RecvFut<'static, (Details, Arc)>, }, } @@ -124,9 +124,9 @@ pin_project_lite::pin_project! { impl Future for CancelSafeProcessor where - F: Future>, + F: Future), Error>>, { - type Output = Result<(Details, web::Bytes), Error>; + type Output = Result<(Details, Arc), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); diff --git a/src/details.rs b/src/details.rs index f95c828..90a7333 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,11 +1,12 @@ use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, formats::{InternalFormat, InternalVideoFormat}, serde_str::Serde, state::State, }; -use actix_web::web; + use time::{format_description::well_known::Rfc3339, OffsetDateTime}; #[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -80,13 +81,16 @@ impl Details { } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn from_bytes(state: &State, input: web::Bytes) -> Result { + pub(crate) async fn from_bytes_stream( + state: &State, + input: BytesStream, + ) -> Result { let Discovery { input, width, height, frames, - } = crate::discover::discover_bytes(state, input).await?; + } = crate::discover::discover_bytes_stream(state, input).await?; Ok(Details::from_parts( input.internal_format(), diff --git a/src/discover.rs b/src/discover.rs index f394c52..362a4cf 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -2,9 +2,9 @@ mod exiftool; mod ffmpeg; mod magick; -use actix_web::web::Bytes; -use crate::{formats::InputFile, state::State}; + +use crate::{bytes_stream::BytesStream, formats::InputFile, state::State}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -27,13 +27,13 @@ pub(crate) enum DiscoverError { } #[tracing::instrument(level = "trace", skip_all)] -pub(crate) async fn discover_bytes( +pub(crate) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result { - let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?; - let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?; + let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?; let discovery = exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index d8ea421..2549827 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -1,6 +1,7 @@ -use actix_web::web::Bytes; + use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, formats::{ImageInput, InputFile}, process::Process, @@ -16,7 +17,7 @@ pub(super) async fn check_reorient( height, frames, }: Discovery, - bytes: Bytes, + bytes: BytesStream, timeout: u64, ) -> Result { let input = match input { @@ -40,9 +41,9 @@ pub(super) async fn check_reorient( } #[tracing::instrument(level = "trace", skip_all)] -async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { +async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 1597b05..f4d3d19 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -4,6 +4,7 @@ mod tests; use std::{collections::HashSet, sync::OnceLock}; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{ AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, @@ -12,7 +13,7 @@ use crate::{ process::Process, state::State, }; -use actix_web::web::Bytes; + use super::Discovery; @@ -158,15 +159,15 @@ struct Flags { } #[tracing::instrument(skip_all)] -pub(super) async fn discover_bytes( +pub(super) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result, FfMpegError> { discover_file(state, move |mut file| { let bytes = bytes.clone(); async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; Ok(file) diff --git a/src/discover/magick.rs b/src/discover/magick.rs index a280736..fb77356 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -1,9 +1,10 @@ #[cfg(test)] mod tests; -use actix_web::web::Bytes; + use crate::{ + bytes_stream::BytesStream, discover::DiscoverError, formats::{AnimationFormat, ImageFormat, ImageInput, InputFile}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, @@ -31,10 +32,10 @@ struct Geometry { } #[tracing::instrument(skip_all)] -pub(super) async fn confirm_bytes( +pub(super) async fn confirm_bytes_stream( state: &State, discovery: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { match discovery { Some(Discovery { @@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes( } discover_file(state, move |mut file| async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; diff --git a/src/exiftool.rs b/src/exiftool.rs index 831ef9b..027b53b 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,8 +1,9 @@ use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, process::{Process, ProcessError, ProcessRead}, }; -use actix_web::web::Bytes; + #[derive(Debug, thiserror::Error)] pub(crate) enum ExifError { @@ -39,9 +40,9 @@ impl ExifError { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { +pub(crate) async fn needs_reorienting(timeout: u64, input: BytesStream) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; @@ -51,9 +52,9 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; - Ok(process.bytes_read(input)) + Ok(process.bytes_stream_read(input)) } diff --git a/src/generate.rs b/src/generate.rs index ca151c8..fdb209d 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -11,7 +11,7 @@ use crate::{ state::State, store::Store, }; -use actix_web::web::Bytes; + use std::{ path::PathBuf, sync::Arc, @@ -57,7 +57,7 @@ pub(crate) async fn generate( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { if state.config.server.danger_dummy_mode { let identifier = state .repo @@ -65,13 +65,7 @@ pub(crate) async fn generate( .await? .ok_or(UploadError::MissingIdentifier)?; - let bytes = state - .store - .to_bytes(&identifier, None, None) - .await? - .into_bytes(); - - Ok((original_details.clone(), bytes)) + Ok((original_details.clone(), identifier)) } else { let process_fut = process( state, @@ -82,14 +76,14 @@ pub(crate) async fn generate( hash.clone(), ); - let (details, bytes) = process_map + let (details, identifier) = process_map .process(hash, thumbnail_path, process_fut) .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics(crate::init_metrics::GENERATE_PROCESS) .await .map_err(|_| UploadError::ProcessTimeout)??; - Ok((details, bytes)) + Ok((details, identifier)) } } @@ -101,7 +95,7 @@ async fn process( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { let guard = MetricsGuard::guard(); let permit = crate::process_semaphore().acquire().await?; @@ -123,7 +117,7 @@ async fn process( let stream = state.store.to_stream(&identifier, None, None).await?; - let vec = crate::magick::process_image_stream_read( + let bytes = crate::magick::process_image_stream_read( state, stream, thumbnail_args, @@ -132,19 +126,19 @@ async fn process( quality, ) .await? - .into_vec() - .instrument(tracing::info_span!("Reading processed image to vec")) + .into_bytes_stream() + .instrument(tracing::info_span!( + "Reading processed image to BytesStream" + )) .await?; - let bytes = Bytes::from(vec); - drop(permit); - let details = Details::from_bytes(state, bytes.clone()).await?; + let details = Details::from_bytes_stream(state, bytes.clone()).await?; let identifier = state .store - .save_bytes(bytes.clone(), details.media_type()) + .save_stream(bytes.into_io_stream(), details.media_type()) .await?; if let Err(VariantAlreadyExists) = state @@ -163,7 +157,7 @@ async fn process( guard.disarm(); - Ok((details, bytes)) as Result<(Details, Bytes), Error> + Ok((details, identifier)) as Result<(Details, Arc), Error> } #[tracing::instrument(skip_all)] diff --git a/src/ingest.rs b/src/ingest.rs index 69802ae..8ab486d 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -14,7 +14,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use reqwest::Body; -use streem::IntoStreamer; use tracing::{Instrument, Span}; mod hasher; @@ -29,25 +28,6 @@ pub(crate) struct Session { identifier: Option>, } -#[tracing::instrument(skip(stream))] -async fn aggregate(stream: S) -> Result -where - S: Stream>, -{ - let mut buf = BytesStream::new(); - - let stream = std::pin::pin!(stream); - let mut stream = stream.into_streamer(); - - while let Some(res) = stream.next().await { - tracing::trace!("aggregate: looping"); - - buf.add_bytes(res?); - } - - Ok(buf.into_bytes()) -} - async fn process_ingest( state: &State, stream: impl Stream> + 'static, @@ -63,14 +43,17 @@ async fn process_ingest( where S: Store, { - let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream)) - .await - .map_err(|_| UploadError::AggregateTimeout)??; + let bytes = tokio::time::timeout( + Duration::from_secs(60), + BytesStream::try_from_stream(stream), + ) + .await + .map_err(|_| UploadError::AggregateTimeout)??; let permit = crate::process_semaphore().acquire().await?; tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?; + let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?; let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -116,7 +99,7 @@ where .await??; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(permit); @@ -143,7 +126,7 @@ where Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), }); - let reader = Box::pin(tokio_util::io::StreamReader::new(stream)); + let reader = tokio_util::io::StreamReader::new(stream); let hasher_reader = Hasher::new(reader); let hash_state = hasher_reader.state(); diff --git a/src/lib.rs b/src/lib.rs index 57cd0ef..44bf807 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,7 +83,7 @@ use self::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Store}, - stream::{empty, once}, + stream::empty, tls::Tls, }; @@ -141,7 +141,7 @@ async fn ensure_details_identifier( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let new_details = Details::from_bytes_stream(state, bytes_stream).await?; tracing::debug!("storing details for {:?}", identifier); state.repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); @@ -841,67 +841,36 @@ async fn process( .variant_identifier(hash.clone(), path_string) .await?; - if let Some(identifier) = identifier_opt { + let (details, identifier) = if let Some(identifier) = identifier_opt { let details = ensure_details_identifier(&state, &identifier).await?; - if let Some(public_url) = state.store.public_url(&identifier) { - return Ok(HttpResponse::SeeOther() - .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) - .finish()); - } - - return ranged_file_resp(&state.store, identifier, range, details, not_found).await; - } - - if state.config.server.read_only { - return Err(UploadError::ReadOnly.into()); - } - - let original_details = ensure_details(&state, &alias).await?; - - let (details, bytes) = generate::generate( - &state, - &process_map, - format, - thumbnail_path, - thumbnail_args, - &original_details, - hash, - ) - .await?; - - let (builder, stream) = if let Some(web::Header(range_header)) = range { - if let Some(range) = range::single_bytes_range(&range_header) { - let len = bytes.len() as u64; - - if let Some(content_range) = range::to_content_range(range, len) { - let mut builder = HttpResponse::PartialContent(); - builder.insert_header(content_range); - let stream = range::chop_bytes(range, bytes, len)?; - - (builder, Either::left(Either::left(stream))) - } else { - ( - HttpResponse::RangeNotSatisfiable(), - Either::left(Either::right(empty())), - ) - } - } else { - return Err(UploadError::Range.into()); - } - } else if not_found { - (HttpResponse::NotFound(), Either::right(once(Ok(bytes)))) + (details, identifier) } else { - (HttpResponse::Ok(), Either::right(once(Ok(bytes)))) + if state.config.server.read_only { + return Err(UploadError::ReadOnly.into()); + } + + let original_details = ensure_details(&state, &alias).await?; + + generate::generate( + &state, + &process_map, + format, + thumbnail_path, + thumbnail_args, + &original_details, + hash, + ) + .await? }; - Ok(srv_response( - builder, - stream, - details.media_type(), - 7 * DAYS, - details.system_time(), - )) + if let Some(public_url) = state.store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + + ranged_file_resp(&state.store, identifier, range, details, not_found).await } #[tracing::instrument(name = "Serving processed image headers", skip(state))] diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 0578bbe..9423621 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -396,7 +396,7 @@ where .await .map_err(From::from) .map_err(MigrateError::Details)?; - let new_details = Details::from_bytes(to, bytes_stream.into_bytes()) + let new_details = Details::from_bytes_stream(to, bytes_stream) .await .map_err(MigrateError::Details)?; to.repo diff --git a/src/process.rs b/src/process.rs index 916e1ed..c512b4f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,3 @@ -use actix_web::web::Bytes; use std::{ ffi::OsStr, future::Future, @@ -6,14 +5,17 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; + use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::AsyncReadExt, process::{Child, ChildStdin, Command}, }; +use tokio_util::io::ReaderStream; use tracing::Instrument; use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, future::{LocalBoxFuture, WithTimeout}, read::BoxRead, @@ -232,12 +234,11 @@ impl Process { } } - pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { + pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead { self.spawn_fn(move |mut stdin| { - let mut input = input; async move { - match stdin.write_all_buf(&mut input).await { - Ok(()) => Ok(()), + match tokio::io::copy(&mut input.into_reader(), &mut stdin).await { + Ok(_) => Ok(()), // BrokenPipe means we finished reading from Stdout, so we don't need to write // to stdin. We'll still error out if the command failed so treat this as a // success @@ -317,6 +318,16 @@ impl ProcessRead { } } + pub(crate) async fn into_bytes_stream(self) -> Result { + let cmd = self.command.clone(); + + self.with_stdout(move |stdout| { + BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64)) + }) + .await? + .map_err(move |e| ProcessError::Read(cmd, e)) + } + pub(crate) async fn into_vec(self) -> Result, ProcessError> { let cmd = self.command.clone(); diff --git a/src/range.rs b/src/range.rs index 7a2694a..f992f38 100644 --- a/src/range.rs +++ b/src/range.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use crate::{ error::{Error, UploadError}, store::Store, - stream::once, }; use actix_web::{ http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range}, @@ -11,20 +10,6 @@ use actix_web::{ }; use futures_core::Stream; -pub(crate) fn chop_bytes( - byte_range: &ByteRangeSpec, - bytes: Bytes, - length: u64, -) -> Result>, Error> { - if let Some((start, end)) = byte_range.to_satisfiable_range(length) { - // END IS INCLUSIVE - let end = end as usize + 1; - return Ok(once(Ok(bytes.slice(start as usize..end)))); - } - - Err(UploadError::Range.into()) -} - pub(crate) async fn chop_store( byte_range: &ByteRangeSpec, store: &S, diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 9cc803f..0a67d4d 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -387,10 +387,9 @@ async fn fetch_or_generate_details( Ok(details) } else { let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let bytes = bytes_stream.into_bytes(); let guard = details_semaphore().acquire().await?; - let details = Details::from_bytes(state, bytes).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(guard); Ok(details) diff --git a/src/store.rs b/src/store.rs index 2006713..02f4d81 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; -use streem::IntoStreamer; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; @@ -92,7 +91,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static; + Reader: AsyncRead; async fn save_stream( &self, @@ -100,7 +99,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static; + S: Stream>; async fn save_bytes( &self, @@ -123,20 +122,11 @@ pub(crate) trait Store: Clone + Debug { from_start: Option, len: Option, ) -> Result { - let mut buf = BytesStream::new(); + let stream = self.to_stream(identifier, from_start, len).await?; - let mut streamer = self - .to_stream(identifier, from_start, len) - .await? - .into_streamer(); - - while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? { - tracing::trace!("to_bytes: looping"); - - buf.add_bytes(bytes); - } - - Ok(buf) + BytesStream::try_from_stream(stream) + .await + .map_err(StoreError::ReadStream) } async fn read_into( @@ -166,7 +156,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -177,7 +167,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -237,7 +227,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -248,7 +238,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -308,7 +298,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -319,7 +309,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 1883706..ddf1819 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -68,12 +68,14 @@ impl Store for FileStore { #[tracing::instrument(skip(self, reader))] async fn save_async_read( &self, - mut reader: Reader, + reader: Reader, _content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { + let mut reader = std::pin::pin!(reader); + let path = self.next_file().await?; if let Err(e) = self.safe_save_reader(&path, &mut reader).await { @@ -90,7 +92,7 @@ impl Store for FileStore { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { self.save_async_read(StreamReader::new(stream), content_type) .await diff --git a/src/store/object_store.rs b/src/store/object_store.rs index c428f92..a119d97 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,6 +1,6 @@ use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo, - store::Store, stream::LocalBoxStream, + store::Store, stream::LocalBoxStream, sync::DropHandle, }; use actix_web::{ error::BlockingError, @@ -170,7 +170,7 @@ fn payload_to_io_error(e: reqwest::Error) -> std::io::Error { #[tracing::instrument(level = "debug", skip(stream))] async fn read_chunk(stream: &mut S) -> Result where - S: Stream> + Unpin + 'static, + S: Stream> + Unpin, { let mut buf = BytesStream::new(); @@ -186,6 +186,12 @@ where } } + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + buf.chunks_len(), + buf.len() / buf.chunks_len() + ); + Ok(buf) } @@ -223,162 +229,78 @@ impl Store for ObjectStore { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { - self.save_stream(ReaderStream::new(reader), content_type) + self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type) .await } #[tracing::instrument(skip_all)] async fn save_stream( &self, - mut stream: S, + stream: S, content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { - let first_chunk = read_chunk(&mut stream).await?; + match self.start_upload(stream, content_type.clone()).await? { + UploadState::Single(first_chunk) => { + let (req, object_id) = self + .put_object_request(first_chunk.len(), content_type) + .await?; - if first_chunk.len() < CHUNK_SIZE { - drop(stream); - let (req, object_id) = self - .put_object_request(first_chunk.len(), content_type) - .await?; - let response = req - .body(Body::wrap_stream(first_chunk)) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) - .await - .map_err(ObjectError::from)?; + let response = req + .body(Body::wrap_stream(first_chunk)) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + .await + .map_err(ObjectError::from)?; - if !response.status().is_success() { - return Err(status_error(response, None).await); + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + return Ok(object_id); } + UploadState::Multi(object_id, upload_id, futures) => { + // hack-ish: use async block as Result boundary + let res = async { + let mut etags = Vec::new(); - return Ok(object_id); - } - - let mut first_chunk = Some(first_chunk); - - let (req, object_id) = self.create_multipart_request(content_type).await?; - let response = req - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let body = response.text().await.map_err(ObjectError::Request)?; - let body = CreateMultipartUpload::parse_response(&body) - .map_err(XmlError::new) - .map_err(ObjectError::Xml)?; - let upload_id = body.upload_id(); - - // hack-ish: use async block as Result boundary - let res = async { - let mut complete = false; - let mut part_number = 0; - let mut futures = Vec::new(); - - while !complete { - tracing::trace!("save_stream: looping"); - - part_number += 1; - - let buf = if let Some(buf) = first_chunk.take() { - buf - } else { - read_chunk(&mut stream).await? - }; - - complete = buf.len() < CHUNK_SIZE; - - let this = self.clone(); - - let object_id2 = object_id.clone(); - let upload_id2 = upload_id.to_string(); - let handle = crate::sync::abort_on_drop(crate::sync::spawn( - "upload-multipart-part", - async move { - let response = this - .create_upload_part_request( - buf.clone(), - &object_id2, - part_number, - &upload_id2, - ) - .await? - .body(Body::wrap_stream(buf)) - .send() - .with_metrics( - crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - // early-drop response to close its tracing spans - drop(response); - - Ok(etag) as Result + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); } - .instrument(tracing::Span::current()), - )); - futures.push(handle); + let response = self + .send_complete_multipart_request( + &object_id, + &upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + Ok(()) as Result<(), StoreError> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, &upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + return Err(e); + } + + Ok(object_id) } - - // early-drop stream to allow the next Part to be polled concurrently - drop(stream); - - let mut etags = Vec::new(); - - for future in futures { - etags.push(future.await.map_err(ObjectError::from)??); - } - - let response = self - .send_complete_multipart_request( - &object_id, - upload_id, - etags.iter().map(|s| s.as_ref()), - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - Ok(()) as Result<(), StoreError> } - .await; - - if let Err(e) = res { - self.create_abort_multipart_request(&object_id, upload_id) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - return Err(e); - } - - Ok(object_id) } #[tracing::instrument(skip_all)] @@ -520,6 +442,15 @@ impl Store for ObjectStore { } } +enum UploadState { + Single(BytesStream), + Multi( + Arc, + String, + Vec>>, + ), +} + impl ObjectStore { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(access_key, secret_key, session_token, repo))] @@ -554,6 +485,128 @@ impl ObjectStore { }) } + #[tracing::instrument(skip_all)] + async fn start_upload( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result + where + S: Stream>, + { + let mut stream = std::pin::pin!(stream); + + let first_chunk = read_chunk(&mut stream).await?; + + if first_chunk.len() < CHUNK_SIZE { + return Ok(UploadState::Single(first_chunk)); + } + + let mut first_chunk = Some(first_chunk); + + let (req, object_id) = self.create_multipart_request(content_type).await?; + let response = req + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let body = response.text().await.map_err(ObjectError::Request)?; + let body = CreateMultipartUpload::parse_response(&body) + .map_err(XmlError::new) + .map_err(ObjectError::Xml)?; + let upload_id = body.upload_id(); + + // hack-ish: use async block as Result boundary + let res = async { + let mut complete = false; + let mut part_number = 0; + let mut futures = Vec::new(); + + while !complete { + tracing::trace!("save_stream: looping"); + + part_number += 1; + + let buf = if let Some(buf) = first_chunk.take() { + buf + } else { + read_chunk(&mut stream).await? + }; + + complete = buf.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.to_string(); + let handle = crate::sync::abort_on_drop(crate::sync::spawn( + "upload-multipart-part", + async move { + let response = this + .create_upload_part_request( + buf.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .body(Body::wrap_stream(buf)) + .send() + .with_metrics( + crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + // early-drop response to close its tracing spans + drop(response); + + Ok(etag) as Result + } + .instrument(tracing::Span::current()), + )); + + futures.push(handle); + } + + Ok(futures) + } + .await; + + match res { + Ok(futures) => Ok(UploadState::Multi( + object_id, + upload_id.to_string(), + futures, + )), + Err(e) => { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + Err(e) + } + } + } + async fn head_bucket_request(&self) -> Result { let action = self.bucket.head_bucket(Some(&self.credentials)); diff --git a/src/stream.rs b/src/stream.rs index cae35da..66a502b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -183,13 +183,6 @@ where streem::from_fn(|_| std::future::ready(())) } -pub(crate) fn once(value: T) -> impl Stream -where - T: 'static, -{ - streem::from_fn(|yielder| yielder.yield_(value)) -} - pub(crate) fn timeout( duration: Duration, stream: S, diff --git a/src/validate.rs b/src/validate.rs index ee857bd..4358491 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -3,6 +3,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, error_code::ErrorCode, @@ -13,7 +14,7 @@ use crate::{ process::ProcessRead, state::State, }; -use actix_web::web::Bytes; + #[derive(Debug, thiserror::Error)] pub(crate) enum ValidationError { @@ -56,9 +57,9 @@ impl ValidationError { const MEGABYTES: usize = 1024 * 1024; #[tracing::instrument(skip_all)] -pub(crate) async fn validate_bytes( +pub(crate) async fn validate_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); @@ -69,7 +70,7 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(state, bytes.clone()).await?; + } = crate::discover::discover_bytes_stream(state, bytes.clone()).await?; match &input { InputFile::Image(input) => { @@ -95,7 +96,7 @@ pub(crate) async fn validate_bytes( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_image( state: &State, - bytes: Bytes, + bytes: BytesStream, input: ImageInput, width: u16, height: u16, @@ -160,7 +161,7 @@ fn validate_animation( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_animation( state: &State, - bytes: Bytes, + bytes: BytesStream, input: AnimationFormat, width: u16, height: u16, @@ -218,7 +219,7 @@ fn validate_video( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_video( state: &State, - bytes: Bytes, + bytes: BytesStream, input: InputVideoFormat, width: u16, height: u16, diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index b97d5f0..2705c11 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,14 +1,18 @@ -use actix_web::web::Bytes; + use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, process::{Process, ProcessRead}, }; #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn clear_metadata_bytes_read( - input: Bytes, + input: BytesStream, timeout: u64, ) -> Result { - Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input)) + Ok( + Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)? + .bytes_stream_read(input), + ) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 1ab7758..b67ac80 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -1,9 +1,10 @@ use std::{ffi::OsStr, sync::Arc}; -use actix_web::web::Bytes; + use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, process::{Process, ProcessRead}, @@ -16,7 +17,7 @@ pub(super) async fn transcode_bytes( output_format: OutputVideo, crf: u8, timeout: u64, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let input_file = tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) @@ -27,7 +28,7 @@ pub(super) async fn transcode_bytes( .await .map_err(FfMpegError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?; diff --git a/src/validate/magick.rs b/src/validate/magick.rs index ac8b720..d6c1ab1 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -1,8 +1,9 @@ use std::ffi::OsStr; -use actix_web::web::Bytes; + use crate::{ + bytes_stream::BytesStream, formats::{AnimationFormat, ImageFormat}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, @@ -14,7 +15,7 @@ pub(super) async fn convert_image( input: ImageFormat, output: ImageFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -32,7 +33,7 @@ pub(super) async fn convert_animation( input: AnimationFormat, output: AnimationFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -51,7 +52,7 @@ async fn convert( output: &'static str, coalesce: bool, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let temporary_path = state .tmp_dir @@ -69,7 +70,7 @@ async fn convert( .await .map_err(MagickError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; tmp_one.close().await.map_err(MagickError::CloseFile)?;