From c1e651c01a2b15cf5b8cc0d7efd8eebe1cc38117 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:02:33 -0600 Subject: [PATCH 01/12] Use BytesStream in more places --- src/bytes_stream.rs | 91 ++++++++++++++++++++++++++++++++++++- src/concurrent_processor.rs | 14 +++--- src/details.rs | 8 +++- src/discover.rs | 10 ++-- src/discover/exiftool.rs | 7 +-- src/discover/ffmpeg.rs | 7 +-- src/discover/magick.rs | 7 +-- src/exiftool.rs | 9 ++-- src/generate.rs | 29 +++++------- src/ingest.rs | 8 ++-- src/lib.rs | 85 +++++++++++----------------------- src/migrate_store.rs | 2 +- src/process.rs | 20 ++++++-- src/range.rs | 15 ------ src/repo/migrate.rs | 3 +- src/stream.rs | 7 --- src/validate.rs | 13 +++--- src/validate/exiftool.rs | 8 +++- src/validate/ffmpeg.rs | 5 +- src/validate/magick.rs | 9 ++-- 20 files changed, 206 insertions(+), 151 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index ae07007..3594be5 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -9,6 +9,8 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use streem::IntoStreamer; +use tokio::io::AsyncRead; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -24,6 +26,21 @@ impl BytesStream { } } + pub(crate) async fn try_from_stream(stream: S) -> Result + where + S: Stream>, + { + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); + let mut bs = Self::new(); + + while let Some(bytes) = stream.try_next().await? { + bs.add_bytes(bytes); + } + + Ok(bs) + } + pub(crate) fn add_bytes(&mut self, bytes: Bytes) { self.total_len += bytes.len(); self.inner.push_back(bytes); @@ -33,7 +50,15 @@ impl BytesStream { self.total_len } - pub(crate) fn into_bytes(self) -> Bytes { + pub(crate) fn is_empty(&self) -> bool { + self.total_len > 0 + } + + fn into_bytes(mut self) -> Bytes { + if self.inner.len() == 1 { + return self.inner.pop_front().expect("Exactly one"); + } + let mut buf = BytesMut::with_capacity(self.total_len); for bytes in self.inner { @@ -42,6 +67,26 @@ impl BytesStream { buf.freeze() } + + pub(crate) fn into_reader(self) -> BytesReader { + BytesReader { + index: 0, + inner: self.inner, + } + } + + pub(crate) fn into_io_stream(self) -> IoStream { + IoStream { stream: self } + } +} + +pub(crate) struct IoStream { + stream: BytesStream, +} + +pub(crate) struct BytesReader { + index: usize, + inner: VecDeque, } impl IntoIterator for BytesStream { @@ -86,3 +131,47 @@ impl Stream for BytesStream { Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) } } + +impl Stream for IoStream { + type Item = std::io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + MessageBody::poll_next(Pin::new(&mut self.get_mut().stream), cx) + } +} + +impl AsyncRead for BytesReader { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + while buf.remaining() > 0 { + if self.index == self.inner[0].len() { + self.inner.pop_front(); + self.index = 0; + } + + if self.inner.is_empty() { + break; + } + + let upper_bound = (self.index + buf.remaining()).min(self.inner[0].len()); + + let slice = &self.inner[0][self.index..upper_bound]; + + buf.put_slice(slice); + self.index += slice.len(); + } + + Poll::Ready(Ok(())) + } +} + +impl From for BytesStream { + fn from(value: Bytes) -> Self { + let mut bs = BytesStream::new(); + bs.add_bytes(value); + bs + } +} diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 4a0f9cd..e7d0fc3 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -15,7 +15,7 @@ use std::{ }; use tracing::Span; -type OutcomeReceiver = Receiver<(Details, web::Bytes)>; +type OutcomeReceiver = Receiver<(Details, Arc)>; type ProcessMapKey = (Hash, PathBuf); @@ -36,9 +36,9 @@ impl ProcessMap { hash: Hash, path: PathBuf, fut: Fut, - ) -> Result<(Details, web::Bytes), Error> + ) -> Result<(Details, Arc), Error> where - Fut: Future>, + Fut: Future), Error>>, { let key = (hash.clone(), path.clone()); @@ -100,10 +100,10 @@ struct CancelToken { enum CancelState { Sender { - sender: Sender<(Details, web::Bytes)>, + sender: Sender<(Details, Arc)>, }, Receiver { - receiver: RecvFut<'static, (Details, web::Bytes)>, + receiver: RecvFut<'static, (Details, Arc)>, }, } @@ -124,9 +124,9 @@ pin_project_lite::pin_project! { impl Future for CancelSafeProcessor where - F: Future>, + F: Future), Error>>, { - type Output = Result<(Details, web::Bytes), Error>; + type Output = Result<(Details, Arc), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); diff --git a/src/details.rs b/src/details.rs index f95c828..d4fd6c0 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,4 +1,5 @@ use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, formats::{InternalFormat, InternalVideoFormat}, @@ -80,13 +81,16 @@ impl Details { } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn from_bytes(state: &State, input: web::Bytes) -> Result { + pub(crate) async fn from_bytes_stream( + state: &State, + input: BytesStream, + ) -> Result { let Discovery { input, width, height, frames, - } = crate::discover::discover_bytes(state, input).await?; + } = crate::discover::discover_bytes_stream(state, input).await?; Ok(Details::from_parts( input.internal_format(), diff --git a/src/discover.rs b/src/discover.rs index f394c52..a6adc7f 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -4,7 +4,7 @@ mod magick; use actix_web::web::Bytes; -use crate::{formats::InputFile, state::State}; +use crate::{bytes_stream::BytesStream, formats::InputFile, state::State}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -27,13 +27,13 @@ pub(crate) enum DiscoverError { } #[tracing::instrument(level = "trace", skip_all)] -pub(crate) async fn discover_bytes( +pub(crate) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result { - let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?; - let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?; + let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?; let discovery = exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index d8ea421..98534d2 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -1,6 +1,7 @@ use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, formats::{ImageInput, InputFile}, process::Process, @@ -16,7 +17,7 @@ pub(super) async fn check_reorient( height, frames, }: Discovery, - bytes: Bytes, + bytes: BytesStream, timeout: u64, ) -> Result { let input = match input { @@ -40,9 +41,9 @@ pub(super) async fn check_reorient( } #[tracing::instrument(level = "trace", skip_all)] -async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { +async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 1597b05..1162489 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -4,6 +4,7 @@ mod tests; use std::{collections::HashSet, sync::OnceLock}; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{ AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, @@ -158,15 +159,15 @@ struct Flags { } #[tracing::instrument(skip_all)] -pub(super) async fn discover_bytes( +pub(super) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result, FfMpegError> { discover_file(state, move |mut file| { let bytes = bytes.clone(); async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; Ok(file) diff --git a/src/discover/magick.rs b/src/discover/magick.rs index a280736..349af37 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -4,6 +4,7 @@ mod tests; use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, discover::DiscoverError, formats::{AnimationFormat, ImageFormat, ImageInput, InputFile}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, @@ -31,10 +32,10 @@ struct Geometry { } #[tracing::instrument(skip_all)] -pub(super) async fn confirm_bytes( +pub(super) async fn confirm_bytes_stream( state: &State, discovery: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { match discovery { Some(Discovery { @@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes( } discover_file(state, move |mut file| async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; diff --git a/src/exiftool.rs b/src/exiftool.rs index 831ef9b..5086ce6 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,4 +1,5 @@ use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, process::{Process, ProcessError, ProcessRead}, }; @@ -39,9 +40,9 @@ impl ExifError { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { +pub(crate) async fn needs_reorienting(timeout: u64, input: BytesStream) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; @@ -51,9 +52,9 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; - Ok(process.bytes_read(input)) + Ok(process.bytes_stream_read(input)) } diff --git a/src/generate.rs b/src/generate.rs index ca151c8..9373d83 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -2,6 +2,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, concurrent_processor::ProcessMap, details::Details, error::{Error, UploadError}, @@ -57,7 +58,7 @@ pub(crate) async fn generate( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { if state.config.server.danger_dummy_mode { let identifier = state .repo @@ -65,13 +66,7 @@ pub(crate) async fn generate( .await? .ok_or(UploadError::MissingIdentifier)?; - let bytes = state - .store - .to_bytes(&identifier, None, None) - .await? - .into_bytes(); - - Ok((original_details.clone(), bytes)) + Ok((original_details.clone(), identifier)) } else { let process_fut = process( state, @@ -82,14 +77,14 @@ pub(crate) async fn generate( hash.clone(), ); - let (details, bytes) = process_map + let (details, identifier) = process_map .process(hash, thumbnail_path, process_fut) .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics(crate::init_metrics::GENERATE_PROCESS) .await .map_err(|_| UploadError::ProcessTimeout)??; - Ok((details, bytes)) + Ok((details, identifier)) } } @@ -101,7 +96,7 @@ async fn process( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { let guard = MetricsGuard::guard(); let permit = crate::process_semaphore().acquire().await?; @@ -123,7 +118,7 @@ async fn process( let stream = state.store.to_stream(&identifier, None, None).await?; - let vec = crate::magick::process_image_stream_read( + let bytes = crate::magick::process_image_stream_read( state, stream, thumbnail_args, @@ -132,19 +127,17 @@ async fn process( quality, ) .await? - .into_vec() + .into_bytes_stream() .instrument(tracing::info_span!("Reading processed image to vec")) .await?; - let bytes = Bytes::from(vec); - drop(permit); - let details = Details::from_bytes(state, bytes.clone()).await?; + let details = Details::from_bytes_stream(state, bytes.clone()).await?; let identifier = state .store - .save_bytes(bytes.clone(), details.media_type()) + .save_stream(bytes.into_io_stream(), details.media_type()) .await?; if let Err(VariantAlreadyExists) = state @@ -163,7 +156,7 @@ async fn process( guard.disarm(); - Ok((details, bytes)) as Result<(Details, Bytes), Error> + Ok((details, identifier)) as Result<(Details, Arc), Error> } #[tracing::instrument(skip_all)] diff --git a/src/ingest.rs b/src/ingest.rs index 69802ae..b3ddd16 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -30,7 +30,7 @@ pub(crate) struct Session { } #[tracing::instrument(skip(stream))] -async fn aggregate(stream: S) -> Result +async fn aggregate(stream: S) -> Result where S: Stream>, { @@ -45,7 +45,7 @@ where buf.add_bytes(res?); } - Ok(buf.into_bytes()) + Ok(buf) } async fn process_ingest( @@ -70,7 +70,7 @@ where let permit = crate::process_semaphore().acquire().await?; tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?; + let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?; let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -116,7 +116,7 @@ where .await??; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(permit); diff --git a/src/lib.rs b/src/lib.rs index 57cd0ef..44bf807 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,7 +83,7 @@ use self::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Store}, - stream::{empty, once}, + stream::empty, tls::Tls, }; @@ -141,7 +141,7 @@ async fn ensure_details_identifier( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let new_details = Details::from_bytes_stream(state, bytes_stream).await?; tracing::debug!("storing details for {:?}", identifier); state.repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); @@ -841,67 +841,36 @@ async fn process( .variant_identifier(hash.clone(), path_string) .await?; - if let Some(identifier) = identifier_opt { + let (details, identifier) = if let Some(identifier) = identifier_opt { let details = ensure_details_identifier(&state, &identifier).await?; - if let Some(public_url) = state.store.public_url(&identifier) { - return Ok(HttpResponse::SeeOther() - .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) - .finish()); - } - - return ranged_file_resp(&state.store, identifier, range, details, not_found).await; - } - - if state.config.server.read_only { - return Err(UploadError::ReadOnly.into()); - } - - let original_details = ensure_details(&state, &alias).await?; - - let (details, bytes) = generate::generate( - &state, - &process_map, - format, - thumbnail_path, - thumbnail_args, - &original_details, - hash, - ) - .await?; - - let (builder, stream) = if let Some(web::Header(range_header)) = range { - if let Some(range) = range::single_bytes_range(&range_header) { - let len = bytes.len() as u64; - - if let Some(content_range) = range::to_content_range(range, len) { - let mut builder = HttpResponse::PartialContent(); - builder.insert_header(content_range); - let stream = range::chop_bytes(range, bytes, len)?; - - (builder, Either::left(Either::left(stream))) - } else { - ( - HttpResponse::RangeNotSatisfiable(), - Either::left(Either::right(empty())), - ) - } - } else { - return Err(UploadError::Range.into()); - } - } else if not_found { - (HttpResponse::NotFound(), Either::right(once(Ok(bytes)))) + (details, identifier) } else { - (HttpResponse::Ok(), Either::right(once(Ok(bytes)))) + if state.config.server.read_only { + return Err(UploadError::ReadOnly.into()); + } + + let original_details = ensure_details(&state, &alias).await?; + + generate::generate( + &state, + &process_map, + format, + thumbnail_path, + thumbnail_args, + &original_details, + hash, + ) + .await? }; - Ok(srv_response( - builder, - stream, - details.media_type(), - 7 * DAYS, - details.system_time(), - )) + if let Some(public_url) = state.store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + + ranged_file_resp(&state.store, identifier, range, details, not_found).await } #[tracing::instrument(name = "Serving processed image headers", skip(state))] diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 0578bbe..9423621 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -396,7 +396,7 @@ where .await .map_err(From::from) .map_err(MigrateError::Details)?; - let new_details = Details::from_bytes(to, bytes_stream.into_bytes()) + let new_details = Details::from_bytes_stream(to, bytes_stream) .await .map_err(MigrateError::Details)?; to.repo diff --git a/src/process.rs b/src/process.rs index 916e1ed..89239bc 100644 --- a/src/process.rs +++ b/src/process.rs @@ -6,14 +6,17 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use streem::IntoStreamer; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; +use tokio_util::io::ReaderStream; use tracing::Instrument; use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, future::{LocalBoxFuture, WithTimeout}, read::BoxRead, @@ -232,12 +235,11 @@ impl Process { } } - pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { + pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead { self.spawn_fn(move |mut stdin| { - let mut input = input; async move { - match stdin.write_all_buf(&mut input).await { - Ok(()) => Ok(()), + match tokio::io::copy(&mut input.into_reader(), &mut stdin).await { + Ok(_) => Ok(()), // BrokenPipe means we finished reading from Stdout, so we don't need to write // to stdin. We'll still error out if the command failed so treat this as a // success @@ -317,6 +319,16 @@ impl ProcessRead { } } + pub(crate) async fn into_bytes_stream(self) -> Result { + let cmd = self.command.clone(); + + self.with_stdout(move |stdout| { + BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16)) + }) + .await? + .map_err(move |e| ProcessError::Read(cmd, e)) + } + pub(crate) async fn into_vec(self) -> Result, ProcessError> { let cmd = self.command.clone(); diff --git a/src/range.rs b/src/range.rs index 7a2694a..f992f38 100644 --- a/src/range.rs +++ b/src/range.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use crate::{ error::{Error, UploadError}, store::Store, - stream::once, }; use actix_web::{ http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range}, @@ -11,20 +10,6 @@ use actix_web::{ }; use futures_core::Stream; -pub(crate) fn chop_bytes( - byte_range: &ByteRangeSpec, - bytes: Bytes, - length: u64, -) -> Result>, Error> { - if let Some((start, end)) = byte_range.to_satisfiable_range(length) { - // END IS INCLUSIVE - let end = end as usize + 1; - return Ok(once(Ok(bytes.slice(start as usize..end)))); - } - - Err(UploadError::Range.into()) -} - pub(crate) async fn chop_store( byte_range: &ByteRangeSpec, store: &S, diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 9cc803f..0a67d4d 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -387,10 +387,9 @@ async fn fetch_or_generate_details( Ok(details) } else { let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let bytes = bytes_stream.into_bytes(); let guard = details_semaphore().acquire().await?; - let details = Details::from_bytes(state, bytes).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(guard); Ok(details) diff --git a/src/stream.rs b/src/stream.rs index cae35da..66a502b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -183,13 +183,6 @@ where streem::from_fn(|_| std::future::ready(())) } -pub(crate) fn once(value: T) -> impl Stream -where - T: 'static, -{ - streem::from_fn(|yielder| yielder.yield_(value)) -} - pub(crate) fn timeout( duration: Duration, stream: S, diff --git a/src/validate.rs b/src/validate.rs index ee857bd..751877e 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -3,6 +3,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, error_code::ErrorCode, @@ -56,9 +57,9 @@ impl ValidationError { const MEGABYTES: usize = 1024 * 1024; #[tracing::instrument(skip_all)] -pub(crate) async fn validate_bytes( +pub(crate) async fn validate_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); @@ -69,7 +70,7 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(state, bytes.clone()).await?; + } = crate::discover::discover_bytes_stream(state, bytes.clone()).await?; match &input { InputFile::Image(input) => { @@ -95,7 +96,7 @@ pub(crate) async fn validate_bytes( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_image( state: &State, - bytes: Bytes, + bytes: BytesStream, input: ImageInput, width: u16, height: u16, @@ -160,7 +161,7 @@ fn validate_animation( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_animation( state: &State, - bytes: Bytes, + bytes: BytesStream, input: AnimationFormat, width: u16, height: u16, @@ -218,7 +219,7 @@ fn validate_video( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_video( state: &State, - bytes: Bytes, + bytes: BytesStream, input: InputVideoFormat, width: u16, height: u16, diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index b97d5f0..fe757ae 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,14 +1,18 @@ use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, process::{Process, ProcessRead}, }; #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn clear_metadata_bytes_read( - input: Bytes, + input: BytesStream, timeout: u64, ) -> Result { - Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input)) + Ok( + Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)? + .bytes_stream_read(input), + ) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 1ab7758..a5039c2 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -4,6 +4,7 @@ use actix_web::web::Bytes; use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, process::{Process, ProcessRead}, @@ -16,7 +17,7 @@ pub(super) async fn transcode_bytes( output_format: OutputVideo, crf: u8, timeout: u64, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let input_file = tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) @@ -27,7 +28,7 @@ pub(super) async fn transcode_bytes( .await .map_err(FfMpegError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?; diff --git a/src/validate/magick.rs b/src/validate/magick.rs index ac8b720..e90b2b3 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -3,6 +3,7 @@ use std::ffi::OsStr; use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, formats::{AnimationFormat, ImageFormat}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, @@ -14,7 +15,7 @@ pub(super) async fn convert_image( input: ImageFormat, output: ImageFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -32,7 +33,7 @@ pub(super) async fn convert_animation( input: AnimationFormat, output: AnimationFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -51,7 +52,7 @@ async fn convert( output: &'static str, coalesce: bool, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let temporary_path = state .tmp_dir @@ -69,7 +70,7 @@ async fn convert( .await .map_err(MagickError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; tmp_one.close().await.map_err(MagickError::CloseFile)?; From 0ebee2a07ce3f80f1201e6da35b121c1d55865b9 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:03:49 -0600 Subject: [PATCH 02/12] cargo fix --- src/concurrent_processor.rs | 2 +- src/details.rs | 2 +- src/discover.rs | 2 +- src/discover/exiftool.rs | 2 +- src/discover/ffmpeg.rs | 2 +- src/discover/magick.rs | 2 +- src/exiftool.rs | 2 +- src/generate.rs | 3 +-- src/process.rs | 6 +++--- src/validate.rs | 2 +- src/validate/exiftool.rs | 2 +- src/validate/ffmpeg.rs | 2 +- src/validate/magick.rs | 2 +- 13 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index e7d0fc3..b15b521 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, repo::Hash, }; -use actix_web::web; + use dashmap::{mapref::entry::Entry, DashMap}; use flume::{r#async::RecvFut, Receiver, Sender}; use std::{ diff --git a/src/details.rs b/src/details.rs index d4fd6c0..90a7333 100644 --- a/src/details.rs +++ b/src/details.rs @@ -6,7 +6,7 @@ use crate::{ serde_str::Serde, state::State, }; -use actix_web::web; + use time::{format_description::well_known::Rfc3339, OffsetDateTime}; #[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/discover.rs b/src/discover.rs index a6adc7f..362a4cf 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -2,7 +2,7 @@ mod exiftool; mod ffmpeg; mod magick; -use actix_web::web::Bytes; + use crate::{bytes_stream::BytesStream, formats::InputFile, state::State}; diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 98534d2..2549827 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -1,4 +1,4 @@ -use actix_web::web::Bytes; + use crate::{ bytes_stream::BytesStream, diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 1162489..f4d3d19 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -13,7 +13,7 @@ use crate::{ process::Process, state::State, }; -use actix_web::web::Bytes; + use super::Discovery; diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 349af37..fb77356 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod tests; -use actix_web::web::Bytes; + use crate::{ bytes_stream::BytesStream, diff --git a/src/exiftool.rs b/src/exiftool.rs index 5086ce6..027b53b 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -3,7 +3,7 @@ use crate::{ error_code::ErrorCode, process::{Process, ProcessError, ProcessRead}, }; -use actix_web::web::Bytes; + #[derive(Debug, thiserror::Error)] pub(crate) enum ExifError { diff --git a/src/generate.rs b/src/generate.rs index 9373d83..43f7360 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -2,7 +2,6 @@ mod ffmpeg; mod magick; use crate::{ - bytes_stream::BytesStream, concurrent_processor::ProcessMap, details::Details, error::{Error, UploadError}, @@ -12,7 +11,7 @@ use crate::{ state::State, store::Store, }; -use actix_web::web::Bytes; + use std::{ path::PathBuf, sync::Arc, diff --git a/src/process.rs b/src/process.rs index 89239bc..1189de2 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,4 @@ -use actix_web::web::Bytes; + use std::{ ffi::OsStr, future::Future, @@ -6,9 +6,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use streem::IntoStreamer; + use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{AsyncReadExt}, process::{Child, ChildStdin, Command}, }; use tokio_util::io::ReaderStream; diff --git a/src/validate.rs b/src/validate.rs index 751877e..4358491 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -14,7 +14,7 @@ use crate::{ process::ProcessRead, state::State, }; -use actix_web::web::Bytes; + #[derive(Debug, thiserror::Error)] pub(crate) enum ValidationError { diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index fe757ae..2705c11 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,4 +1,4 @@ -use actix_web::web::Bytes; + use crate::{ bytes_stream::BytesStream, diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index a5039c2..b67ac80 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -1,6 +1,6 @@ use std::{ffi::OsStr, sync::Arc}; -use actix_web::web::Bytes; + use uuid::Uuid; use crate::{ diff --git a/src/validate/magick.rs b/src/validate/magick.rs index e90b2b3..d6c1ab1 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -1,6 +1,6 @@ use std::ffi::OsStr; -use actix_web::web::Bytes; + use crate::{ bytes_stream::BytesStream, From 3a7d5b7bfb6d5249a16a1e597b846279e103fb3c Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:10:34 -0600 Subject: [PATCH 03/12] Re-use try_from_stream where possible --- src/bytes_stream.rs | 2 ++ src/ingest.rs | 29 ++++++----------------------- src/store.rs | 18 ++++-------------- 3 files changed, 12 insertions(+), 37 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 3594be5..cc5e1e1 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -26,6 +26,7 @@ impl BytesStream { } } + #[tracing::instrument(skip(stream))] pub(crate) async fn try_from_stream(stream: S) -> Result where S: Stream>, @@ -35,6 +36,7 @@ impl BytesStream { let mut bs = Self::new(); while let Some(bytes) = stream.try_next().await? { + tracing::trace!("try_from_stream: looping"); bs.add_bytes(bytes); } diff --git a/src/ingest.rs b/src/ingest.rs index b3ddd16..0698ea8 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -14,7 +14,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use reqwest::Body; -use streem::IntoStreamer; use tracing::{Instrument, Span}; mod hasher; @@ -29,25 +28,6 @@ pub(crate) struct Session { identifier: Option>, } -#[tracing::instrument(skip(stream))] -async fn aggregate(stream: S) -> Result -where - S: Stream>, -{ - let mut buf = BytesStream::new(); - - let stream = std::pin::pin!(stream); - let mut stream = stream.into_streamer(); - - while let Some(res) = stream.next().await { - tracing::trace!("aggregate: looping"); - - buf.add_bytes(res?); - } - - Ok(buf) -} - async fn process_ingest( state: &State, stream: impl Stream> + 'static, @@ -63,9 +43,12 @@ async fn process_ingest( where S: Store, { - let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream)) - .await - .map_err(|_| UploadError::AggregateTimeout)??; + let bytes = tokio::time::timeout( + Duration::from_secs(60), + BytesStream::try_from_stream(stream), + ) + .await + .map_err(|_| UploadError::AggregateTimeout)??; let permit = crate::process_semaphore().acquire().await?; diff --git a/src/store.rs b/src/store.rs index 2006713..9d7fcaa 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; -use streem::IntoStreamer; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; @@ -123,20 +122,11 @@ pub(crate) trait Store: Clone + Debug { from_start: Option, len: Option, ) -> Result { - let mut buf = BytesStream::new(); + let stream = self.to_stream(identifier, from_start, len).await?; - let mut streamer = self - .to_stream(identifier, from_start, len) - .await? - .into_streamer(); - - while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? { - tracing::trace!("to_bytes: looping"); - - buf.add_bytes(bytes); - } - - Ok(buf) + BytesStream::try_from_stream(stream) + .await + .map_err(StoreError::ReadStream) } async fn read_into( From de356c1f12d9aae98bd5eb259eaf341838c1dba9 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:15:32 -0600 Subject: [PATCH 04/12] Remove MessageBody impl --- src/bytes_stream.rs | 61 ++++++++++----------------------------------- 1 file changed, 13 insertions(+), 48 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index cc5e1e1..9c62b5a 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -1,7 +1,4 @@ -use actix_web::{ - body::MessageBody, - web::{Bytes, BytesMut}, -}; +use actix_web::web::Bytes; use futures_core::Stream; use std::{ collections::{vec_deque::IntoIter, VecDeque}, @@ -56,20 +53,6 @@ impl BytesStream { self.total_len > 0 } - fn into_bytes(mut self) -> Bytes { - if self.inner.len() == 1 { - return self.inner.pop_front().expect("Exactly one"); - } - - let mut buf = BytesMut::with_capacity(self.total_len); - - for bytes in self.inner { - buf.extend_from_slice(&bytes); - } - - buf.freeze() - } - pub(crate) fn into_reader(self) -> BytesReader { BytesReader { index: 0, @@ -78,12 +61,12 @@ impl BytesStream { } pub(crate) fn into_io_stream(self) -> IoStream { - IoStream { stream: self } + IoStream { inner: self.inner } } } pub(crate) struct IoStream { - stream: BytesStream, + inner: VecDeque, } pub(crate) struct BytesReader { @@ -100,45 +83,27 @@ impl IntoIterator for BytesStream { } } -impl MessageBody for BytesStream { - type Error = std::io::Error; - - fn size(&self) -> actix_web::body::BodySize { - if let Ok(len) = self.len().try_into() { - actix_web::body::BodySize::Sized(len) - } else { - actix_web::body::BodySize::None - } - } - - fn poll_next( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) - } - - fn try_into_bytes(self) -> Result - where - Self: Sized, - { - Ok(self.into_bytes()) - } -} - impl Stream for BytesStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) } + + fn size_hint(&self) -> (usize, Option) { + (self.inner.len(), Some(self.inner.len())) + } } impl Stream for IoStream { type Item = std::io::Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - MessageBody::poll_next(Pin::new(&mut self.get_mut().stream), cx) + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + (self.inner.len(), Some(self.inner.len())) } } From 00a08a8bc91ee57f0453d49e2c6bd76ac965704b Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:21:31 -0600 Subject: [PATCH 05/12] Improve AyncRead impl --- src/bytes_stream.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 9c62b5a..ac3a60c 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -110,25 +110,26 @@ impl Stream for IoStream { impl AsyncRead for BytesReader { fn poll_read( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + _: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { while buf.remaining() > 0 { - if self.index == self.inner[0].len() { - self.inner.pop_front(); - self.index = 0; - } + if let Some(bytes) = self.inner.front() { + if self.index == bytes.len() { + self.inner.pop_front(); + self.index = 0; + continue; + } - if self.inner.is_empty() { + let upper_bound = (self.index + buf.remaining()).min(bytes.len()); + + let slice = &bytes[self.index..upper_bound]; + + buf.put_slice(slice); + self.index += slice.len(); + } else { break; } - - let upper_bound = (self.index + buf.remaining()).min(self.inner[0].len()); - - let slice = &self.inner[0][self.index..upper_bound]; - - buf.put_slice(slice); - self.index += slice.len(); } Poll::Ready(Ok(())) From f3e455a1c3d9cc8cbc225265488d4c0d0eef24b9 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:25:03 -0600 Subject: [PATCH 06/12] Increase buffer size for AsyncRead -> Stream conversion --- src/store/object_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index c428f92..b6a82a1 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -225,7 +225,7 @@ impl Store for ObjectStore { where Reader: AsyncRead + Unpin + 'static, { - self.save_stream(ReaderStream::new(reader), content_type) + self.save_stream(ReaderStream::with_capacity(reader, 1024 * 16), content_type) .await } From 59b03d548de5691cf171c736297d9e98b5b7f786 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 17:09:03 -0600 Subject: [PATCH 07/12] Fix is_empty --- src/bytes_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index ac3a60c..d33a0e7 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -50,7 +50,7 @@ impl BytesStream { } pub(crate) fn is_empty(&self) -> bool { - self.total_len > 0 + self.total_len == 0 } pub(crate) fn into_reader(self) -> BytesReader { From 227e9cc3a7711c511745887e38abcad3da99b319 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 17:49:16 -0600 Subject: [PATCH 08/12] BytesReader: use built-in state tracking --- src/bytes_stream.rs | 30 +++++++++++++++++++----------- src/generate.rs | 4 +++- src/process.rs | 5 ++--- src/store/object_store.rs | 8 +++++++- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index d33a0e7..1e88704 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -8,6 +8,7 @@ use std::{ }; use streem::IntoStreamer; use tokio::io::AsyncRead; +use tokio_util::bytes::Buf; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -37,9 +38,19 @@ impl BytesStream { bs.add_bytes(bytes); } + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + bs.chunks_len(), + bs.len() / bs.chunks_len() + ); + Ok(bs) } + pub(crate) fn chunks_len(&self) -> usize { + self.inner.len() + } + pub(crate) fn add_bytes(&mut self, bytes: Bytes) { self.total_len += bytes.len(); self.inner.push_back(bytes); @@ -54,10 +65,7 @@ impl BytesStream { } pub(crate) fn into_reader(self) -> BytesReader { - BytesReader { - index: 0, - inner: self.inner, - } + BytesReader { inner: self.inner } } pub(crate) fn into_io_stream(self) -> IoStream { @@ -70,7 +78,6 @@ pub(crate) struct IoStream { } pub(crate) struct BytesReader { - index: usize, inner: VecDeque, } @@ -114,19 +121,20 @@ impl AsyncRead for BytesReader { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { while buf.remaining() > 0 { - if let Some(bytes) = self.inner.front() { - if self.index == bytes.len() { + tracing::trace!("bytes reader: looping"); + + if let Some(bytes) = self.inner.front_mut() { + if bytes.is_empty() { self.inner.pop_front(); - self.index = 0; continue; } - let upper_bound = (self.index + buf.remaining()).min(bytes.len()); + let upper_bound = buf.remaining().min(bytes.len()); - let slice = &bytes[self.index..upper_bound]; + let slice = &bytes[..upper_bound]; buf.put_slice(slice); - self.index += slice.len(); + bytes.advance(upper_bound); } else { break; } diff --git a/src/generate.rs b/src/generate.rs index 43f7360..fdb209d 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -127,7 +127,9 @@ async fn process( ) .await? .into_bytes_stream() - .instrument(tracing::info_span!("Reading processed image to vec")) + .instrument(tracing::info_span!( + "Reading processed image to BytesStream" + )) .await?; drop(permit); diff --git a/src/process.rs b/src/process.rs index 1189de2..c512b4f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,3 @@ - use std::{ ffi::OsStr, future::Future, @@ -8,7 +7,7 @@ use std::{ }; use tokio::{ - io::{AsyncReadExt}, + io::AsyncReadExt, process::{Child, ChildStdin, Command}, }; use tokio_util::io::ReaderStream; @@ -323,7 +322,7 @@ impl ProcessRead { let cmd = self.command.clone(); self.with_stdout(move |stdout| { - BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16)) + BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64)) }) .await? .map_err(move |e| ProcessError::Read(cmd, e)) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index b6a82a1..354c1b5 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -186,6 +186,12 @@ where } } + tracing::debug!( + "BytesStream with {} chunks, avg length {}", + buf.chunks_len(), + buf.len() / buf.chunks_len() + ); + Ok(buf) } @@ -225,7 +231,7 @@ impl Store for ObjectStore { where Reader: AsyncRead + Unpin + 'static, { - self.save_stream(ReaderStream::with_capacity(reader, 1024 * 16), content_type) + self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type) .await } From 16bf18bda4816da9c968727e7aee58a3640561f9 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 18:05:04 -0600 Subject: [PATCH 09/12] Remove some Unpin and 'static bounds --- src/backgrounded.rs | 4 ++-- src/ingest.rs | 2 +- src/store.rs | 16 ++++++++-------- src/store/file_store.rs | 8 +++++--- src/store/object_store.rs | 10 ++++++---- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 37f8677..379e088 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -54,9 +54,9 @@ impl Backgrounded { { self.upload_id = Some(self.repo.create_upload().await?); - let stream = Box::pin(crate::stream::map_err(stream, |e| { + let stream = crate::stream::map_err(stream, |e| { std::io::Error::new(std::io::ErrorKind::Other, e) - })); + }); // use octet-stream, we don't know the upload's real type yet let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?; diff --git a/src/ingest.rs b/src/ingest.rs index 0698ea8..8ab486d 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -126,7 +126,7 @@ where Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), }); - let reader = Box::pin(tokio_util::io::StreamReader::new(stream)); + let reader = tokio_util::io::StreamReader::new(stream); let hasher_reader = Hasher::new(reader); let hash_state = hasher_reader.state(); diff --git a/src/store.rs b/src/store.rs index 9d7fcaa..02f4d81 100644 --- a/src/store.rs +++ b/src/store.rs @@ -91,7 +91,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static; + Reader: AsyncRead; async fn save_stream( &self, @@ -99,7 +99,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static; + S: Stream>; async fn save_bytes( &self, @@ -156,7 +156,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -167,7 +167,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -227,7 +227,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -238,7 +238,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -298,7 +298,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -309,7 +309,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 1883706..ddf1819 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -68,12 +68,14 @@ impl Store for FileStore { #[tracing::instrument(skip(self, reader))] async fn save_async_read( &self, - mut reader: Reader, + reader: Reader, _content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { + let mut reader = std::pin::pin!(reader); + let path = self.next_file().await?; if let Err(e) = self.safe_save_reader(&path, &mut reader).await { @@ -90,7 +92,7 @@ impl Store for FileStore { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { self.save_async_read(StreamReader::new(stream), content_type) .await diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 354c1b5..093d939 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -170,7 +170,7 @@ fn payload_to_io_error(e: reqwest::Error) -> std::io::Error { #[tracing::instrument(level = "debug", skip(stream))] async fn read_chunk(stream: &mut S) -> Result where - S: Stream> + Unpin + 'static, + S: Stream> + Unpin, { let mut buf = BytesStream::new(); @@ -229,7 +229,7 @@ impl Store for ObjectStore { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type) .await @@ -238,12 +238,14 @@ impl Store for ObjectStore { #[tracing::instrument(skip_all)] async fn save_stream( &self, - mut stream: S, + stream: S, content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { + let mut stream = std::pin::pin!(stream); + let first_chunk = read_chunk(&mut stream).await?; if first_chunk.len() < CHUNK_SIZE { From 3470a6caf0f95379aff8c843cf5a815cc3112737 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 18:05:29 -0600 Subject: [PATCH 10/12] Don't start minio --- docker/object-storage/docker-compose.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index 7c79728..5d37419 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -29,14 +29,14 @@ services: # - PICTRS_PROXY_UPSTREAM=http://pictrs:8080 # - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317 - minio: - image: quay.io/minio/minio - command: server /mnt --console-address ":9001" - ports: - - "9000:9000" - - "9001:9001" - volumes: - - ./storage/minio:/mnt + # minio: + # image: quay.io/minio/minio + # command: server /mnt --console-address ":9001" + # ports: + # - "9000:9000" + # - "9001:9001" + # volumes: + # - ./storage/minio:/mnt garage: image: dxflrs/garage:v0.9.0 From d73e683d4872408b26c3dc14ca3ff5b9f0ac870a Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 18:23:39 -0600 Subject: [PATCH 11/12] Remove unused From --- src/bytes_stream.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 1e88704..df1ba8c 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -143,11 +143,3 @@ impl AsyncRead for BytesReader { Poll::Ready(Ok(())) } } - -impl From for BytesStream { - fn from(value: Bytes) -> Self { - let mut bs = BytesStream::new(); - bs.add_bytes(value); - bs - } -} From 6a6c61058ac4f320e8ddb8ec6ab483bf4d2c2bfa Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 18:55:12 -0600 Subject: [PATCH 12/12] Split save_stream for object storage to early-drop stack-pinned stream --- src/store/object_store.rs | 315 ++++++++++++++++++++++---------------- 1 file changed, 180 insertions(+), 135 deletions(-) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 093d939..a119d97 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,6 +1,6 @@ use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo, - store::Store, stream::LocalBoxStream, + store::Store, stream::LocalBoxStream, sync::DropHandle, }; use actix_web::{ error::BlockingError, @@ -244,149 +244,63 @@ impl Store for ObjectStore { where S: Stream>, { - let mut stream = std::pin::pin!(stream); + match self.start_upload(stream, content_type.clone()).await? { + UploadState::Single(first_chunk) => { + let (req, object_id) = self + .put_object_request(first_chunk.len(), content_type) + .await?; - let first_chunk = read_chunk(&mut stream).await?; + let response = req + .body(Body::wrap_stream(first_chunk)) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + .await + .map_err(ObjectError::from)?; - if first_chunk.len() < CHUNK_SIZE { - drop(stream); - let (req, object_id) = self - .put_object_request(first_chunk.len(), content_type) - .await?; - let response = req - .body(Body::wrap_stream(first_chunk)) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) - .await - .map_err(ObjectError::from)?; + if !response.status().is_success() { + return Err(status_error(response, None).await); + } - if !response.status().is_success() { - return Err(status_error(response, None).await); + return Ok(object_id); } + UploadState::Multi(object_id, upload_id, futures) => { + // hack-ish: use async block as Result boundary + let res = async { + let mut etags = Vec::new(); - return Ok(object_id); - } - - let mut first_chunk = Some(first_chunk); - - let (req, object_id) = self.create_multipart_request(content_type).await?; - let response = req - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let body = response.text().await.map_err(ObjectError::Request)?; - let body = CreateMultipartUpload::parse_response(&body) - .map_err(XmlError::new) - .map_err(ObjectError::Xml)?; - let upload_id = body.upload_id(); - - // hack-ish: use async block as Result boundary - let res = async { - let mut complete = false; - let mut part_number = 0; - let mut futures = Vec::new(); - - while !complete { - tracing::trace!("save_stream: looping"); - - part_number += 1; - - let buf = if let Some(buf) = first_chunk.take() { - buf - } else { - read_chunk(&mut stream).await? - }; - - complete = buf.len() < CHUNK_SIZE; - - let this = self.clone(); - - let object_id2 = object_id.clone(); - let upload_id2 = upload_id.to_string(); - let handle = crate::sync::abort_on_drop(crate::sync::spawn( - "upload-multipart-part", - async move { - let response = this - .create_upload_part_request( - buf.clone(), - &object_id2, - part_number, - &upload_id2, - ) - .await? - .body(Body::wrap_stream(buf)) - .send() - .with_metrics( - crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - // early-drop response to close its tracing spans - drop(response); - - Ok(etag) as Result + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); } - .instrument(tracing::Span::current()), - )); - futures.push(handle); + let response = self + .send_complete_multipart_request( + &object_id, + &upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + Ok(()) as Result<(), StoreError> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, &upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + return Err(e); + } + + Ok(object_id) } - - // early-drop stream to allow the next Part to be polled concurrently - drop(stream); - - let mut etags = Vec::new(); - - for future in futures { - etags.push(future.await.map_err(ObjectError::from)??); - } - - let response = self - .send_complete_multipart_request( - &object_id, - upload_id, - etags.iter().map(|s| s.as_ref()), - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - Ok(()) as Result<(), StoreError> } - .await; - - if let Err(e) = res { - self.create_abort_multipart_request(&object_id, upload_id) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - return Err(e); - } - - Ok(object_id) } #[tracing::instrument(skip_all)] @@ -528,6 +442,15 @@ impl Store for ObjectStore { } } +enum UploadState { + Single(BytesStream), + Multi( + Arc, + String, + Vec>>, + ), +} + impl ObjectStore { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(access_key, secret_key, session_token, repo))] @@ -562,6 +485,128 @@ impl ObjectStore { }) } + #[tracing::instrument(skip_all)] + async fn start_upload( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result + where + S: Stream>, + { + let mut stream = std::pin::pin!(stream); + + let first_chunk = read_chunk(&mut stream).await?; + + if first_chunk.len() < CHUNK_SIZE { + return Ok(UploadState::Single(first_chunk)); + } + + let mut first_chunk = Some(first_chunk); + + let (req, object_id) = self.create_multipart_request(content_type).await?; + let response = req + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let body = response.text().await.map_err(ObjectError::Request)?; + let body = CreateMultipartUpload::parse_response(&body) + .map_err(XmlError::new) + .map_err(ObjectError::Xml)?; + let upload_id = body.upload_id(); + + // hack-ish: use async block as Result boundary + let res = async { + let mut complete = false; + let mut part_number = 0; + let mut futures = Vec::new(); + + while !complete { + tracing::trace!("save_stream: looping"); + + part_number += 1; + + let buf = if let Some(buf) = first_chunk.take() { + buf + } else { + read_chunk(&mut stream).await? + }; + + complete = buf.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.to_string(); + let handle = crate::sync::abort_on_drop(crate::sync::spawn( + "upload-multipart-part", + async move { + let response = this + .create_upload_part_request( + buf.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .body(Body::wrap_stream(buf)) + .send() + .with_metrics( + crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + // early-drop response to close its tracing spans + drop(response); + + Ok(etag) as Result + } + .instrument(tracing::Span::current()), + )); + + futures.push(handle); + } + + Ok(futures) + } + .await; + + match res { + Ok(futures) => Ok(UploadState::Multi( + object_id, + upload_id.to_string(), + futures, + )), + Err(e) => { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + Err(e) + } + } + } + async fn head_bucket_request(&self) -> Result { let action = self.bucket.head_bucket(Some(&self.credentials));