From 08c3169d3ff89c2a58edb0c0cdbe36fefae1ad46 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 4 Sep 2021 12:42:40 -0500 Subject: [PATCH] Fewer streams --- src/exiftool.rs | 20 ---- src/ffmpeg.rs | 56 ----------- src/magick.rs | 133 ++------------------------ src/main.rs | 56 +++-------- src/stream.rs | 211 +----------------------------------------- src/upload_manager.rs | 125 ++++++++++++------------- src/validate.rs | 49 +--------- 7 files changed, 84 insertions(+), 566 deletions(-) diff --git a/src/exiftool.rs b/src/exiftool.rs index 6794406..8d784e3 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -7,23 +7,3 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result std::io::Result { - let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; - - Ok(process.write_read(input).unwrap()) -} - -pub(crate) fn clear_metadata_stream( - input: S, -) -> std::io::Result>> -where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, -{ - let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; - - Ok(Box::pin(process.sink_stream(input).unwrap())) -} diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 8f701d4..fd1d915 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -83,62 +83,6 @@ pub(crate) fn to_mp4_bytes( Ok(process.bytes_read(input).unwrap()) } -pub(crate) fn to_mp4_write_read( - input: impl AsyncRead + Unpin + 'static, - input_format: InputFormat, -) -> std::io::Result { - let process = Process::spawn(Command::new("ffmpeg").args([ - "-f", - input_format.as_format(), - "-i", - "pipe:", - "-movflags", - "faststart+frag_keyframe+empty_moov", - "-pix_fmt", - "yuv420p", - "-vf", - "scale=trunc(iw/2)*2:trunc(ih/2)*2", - "-an", - "-codec", - "h264", - "-f", - "mp4", - "pipe:", - ]))?; - - Ok(process.write_read(input).unwrap()) -} - -pub(crate) fn to_mp4_stream( - input: S, - input_format: InputFormat, -) -> std::io::Result>> -where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, -{ - let process = Process::spawn(Command::new("ffmpeg").args([ - "-f", - input_format.as_format(), - "-i", - "pipe:", - "-movflags", - "faststart+frag_keyframe+empty_moov", - "-pix_fmt", - "yuv420p", - "-vf", - "scale=trunc(iw/2)*2:trunc(ih/2)*2", - "-an", - "-codec", - "h264", - "-f", - "mp4", - "pipe:", - ]))?; - - Ok(Box::pin(process.sink_stream(input).unwrap())) -} - pub(crate) async fn thumbnail( from: P1, to: P2, diff --git a/src/magick.rs b/src/magick.rs index 131dd5c..99fe8d6 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -46,17 +46,7 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result std::io::Result { - let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; - - Ok(process.write_read(input).unwrap()) -} - -pub(crate) async fn details_write_read( - input: impl AsyncRead + Unpin + 'static, -) -> Result { +pub(crate) async fn details_bytes(input: Bytes) -> Result { let process = Process::spawn(Command::new("magick").args([ "identify", "-ping", @@ -65,41 +55,15 @@ pub(crate) async fn details_write_read( "-", ]))?; - let mut reader = process.write_read(input).unwrap(); + let mut reader = process.bytes_read(input).unwrap(); let mut bytes = Vec::new(); - reader.read_to_end(&mut bytes).await?; - let s = String::from_utf8_lossy(&bytes); + parse_details(s) } -pub(crate) fn convert_write_read( - input: impl AsyncRead + Unpin + 'static, - format: Format, -) -> std::io::Result { - let process = Process::spawn(Command::new("magick").args([ - "convert", - "-", - format!("{}:-", format.to_magick_format()).as_str(), - ]))?; - - Ok(process.write_read(input).unwrap()) -} - -pub(crate) fn clear_metadata_stream( - input: S, -) -> std::io::Result>> -where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, -{ - let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; - - Ok(Box::pin(process.sink_stream(input).unwrap())) -} - pub(crate) fn convert_bytes_read( input: Bytes, format: Format, @@ -113,56 +77,6 @@ pub(crate) fn convert_bytes_read( Ok(process.bytes_read(input).unwrap()) } -pub(crate) fn convert_stream( - input: S, - format: Format, -) -> std::io::Result>> -where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, -{ - let process = Process::spawn(Command::new("magick").args([ - "convert", - "-", - format!("{}:-", format.to_magick_format()).as_str(), - ]))?; - - Ok(Box::pin(process.sink_stream(input).unwrap())) -} - -pub(crate) async fn details_stream(input: S) -> Result -where - S: futures::stream::Stream> + Unpin, - E1: From, - E2: From + From + From, -{ - use futures::stream::StreamExt; - - let permit = semaphore().acquire().await.map_err(MagickError::from)?; - - let mut process = Process::spawn(Command::new("magick").args([ - "identify", - "-ping", - "-format", - "%w %h | %m\n", - "-", - ]))?; - - process.take_sink().unwrap().send(input).await?; - let mut stream = process.take_stream().unwrap(); - - let mut buf = actix_web::web::BytesMut::new(); - while let Some(res) = stream.next().await { - let bytes = res?; - buf.extend_from_slice(&bytes); - } - - drop(permit); - - let s = String::from_utf8_lossy(&buf); - Ok(parse_details(s)?) -} - pub(crate) async fn details

(file: P) -> Result where P: AsRef, @@ -250,35 +164,6 @@ pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result(input: S) -> Result -where - S: futures::stream::Stream> + Unpin, - E1: From, - E2: From + From + From, -{ - use futures::stream::StreamExt; - - let permit = semaphore().acquire().await.map_err(MagickError::from)?; - - let mut process = - Process::spawn(Command::new("magick").args(["identify", "-ping", "-format", "%m\n", "-"]))?; - - process.take_sink().unwrap().send(input).await?; - let mut stream = process.take_stream().unwrap(); - - let mut buf = actix_web::web::BytesMut::new(); - while let Some(res) = stream.next().await { - let bytes = res?; - buf.extend_from_slice(&bytes); - } - - drop(permit); - - let s = String::from_utf8_lossy(&buf); - - Ok(parse_input_type(s)?) -} - fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result { let mut lines = s.lines(); let first = lines.next(); @@ -298,15 +183,11 @@ fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result( - input: S, +pub(crate) fn process_image_write_read( + input: impl AsyncRead + Unpin + 'static, args: Vec, format: Format, -) -> std::io::Result>> -where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, -{ +) -> std::io::Result { let process = Process::spawn( Command::new("magick") .args([&"convert", &"-"]) @@ -314,7 +195,7 @@ where .arg(format!("{}:-", format.to_magick_format())), )?; - Ok(Box::pin(process.sink_stream(input).unwrap())) + Ok(process.write_read(input).unwrap()) } impl From for MagickError { diff --git a/src/main.rs b/src/main.rs index 45f3c19..f9d7271 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use futures::stream::{Stream, TryStreamExt}; use once_cell::sync::Lazy; use std::{collections::HashSet, path::PathBuf, pin::Pin, time::SystemTime}; use structopt::StructOpt; +use tokio::io::AsyncReadExt; use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; @@ -402,58 +403,27 @@ async fn process( } } - let stream = Box::pin(async_stream::stream! { - use futures::stream::StreamExt; + let file = tokio::fs::File::open(original_path.clone()).await?; - let mut s = actix_fs::read_to_stream(original_path.clone()) - .await? - .faster(); + let mut processed_reader = + crate::magick::process_image_write_read(file, thumbnail_args, format)?; - while let Some(res) = s.next().await { - yield res.map_err(UploadError::from); - } - }); + let mut vec = Vec::new(); + processed_reader.read_to_end(&mut vec).await?; + let bytes = web::Bytes::from(vec); - let processed_stream = crate::magick::process_image_stream(stream, thumbnail_args, format)?; - - let (base_stream, copied_stream) = crate::stream::try_duplicate(processed_stream, 1024); - - let (details, base_stream) = if let Some(details) = details { - ( - details, - Box::pin(base_stream) - as Pin< - Box< - dyn futures::stream::Stream< - Item = Result, - >, - >, - >, - ) + let details = if let Some(details) = details { + details } else { - let (base_stream, copied2) = crate::stream::try_duplicate(Box::pin(base_stream), 1024); - let details = Details::from_stream(Box::pin(base_stream)).await?; - ( - details, - Box::pin(copied2) - as Pin< - Box< - dyn futures::stream::Stream< - Item = Result, - >, - >, - >, - ) + Details::from_bytes(bytes.clone()).await? }; let span = tracing::Span::current(); let details2 = details.clone(); + let bytes2 = bytes.clone(); actix_rt::spawn(async move { let entered = span.enter(); - if let Err(e) = - upload_manager::safe_save_stream(thumbnail_path.clone(), Box::pin(copied_stream)) - .await - { + if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await { tracing::warn!("Error saving thumbnail: {}", e); return; } @@ -472,7 +442,7 @@ async fn process( return Ok(srv_response( HttpResponse::Ok(), - base_stream, + futures::stream::once(futures::future::ready(Ok(bytes) as Result<_, UploadError>)), details.content_type(), 7 * DAYS, details.system_time(), diff --git a/src/stream.rs b/src/stream.rs index 185a2fb..99efdb3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,12 +1,11 @@ use actix_web::web::Bytes; -use futures::stream::{LocalBoxStream, Stream, StreamExt}; +use futures::stream::{LocalBoxStream, Stream}; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf}; -use tokio_stream::wrappers::ReceiverStream; +use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; pub(crate) struct ReadAdapter { inner: S, @@ -22,59 +21,10 @@ pub(crate) struct ProcessRead { err_closed: bool, } -pub(crate) struct ProcessSink { - stdin: tokio::process::ChildStdin, -} - -pub(crate) struct ProcessStream { - stream: LocalBoxStream<'static, std::io::Result>, -} - pub(crate) struct ProcessSinkStream { stream: LocalBoxStream<'static, Result>, } -pub(crate) struct TryDuplicateStream { - inner: ReceiverStream>, -} - -#[derive(Debug)] -pub(crate) struct StringError(String); - -impl ReadAdapter { - pub(crate) fn new_unsync( - mut stream: S, - ) -> ReadAdapter>> - where - S: Stream> + Unpin + 'static, - E: std::fmt::Display, - { - let (tx, rx) = tokio::sync::mpsc::channel(1); - - actix_rt::spawn(async move { - while let Some(res) = stream.next().await { - if tx - .send(res.map_err(|e| StringError(e.to_string()))) - .await - .is_err() - { - break; - } - } - }); - - ReadAdapter::new_sync(ReceiverStream::new(rx)) - } - - fn new_sync(stream: S) -> Self - where - S: Stream> + Unpin, - E: Into>, - { - ReadAdapter { inner: stream } - } -} - impl Process { fn new(child: tokio::process::Child) -> Self { Process { child } @@ -87,14 +37,6 @@ impl Process { .map(Process::new) } - pub(crate) fn take_sink(&mut self) -> Option { - self.child.stdin.take().map(ProcessSink::new) - } - - pub(crate) fn take_stream(&mut self) -> Option { - self.child.stdout.take().map(ProcessStream::new) - } - pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; @@ -135,131 +77,6 @@ impl Process { err_closed: false, })) } - - pub(crate) fn sink_stream(mut self, input_stream: S) -> Option> - where - S: Stream> + Unpin + 'static, - E: From + 'static, - { - let mut stdin = self.take_sink()?; - let mut stdout = self.take_stream()?; - - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - - actix_rt::spawn(async move { - if let Err(e) = stdin.send(input_stream).await { - let _ = tx.send(e).await; - } - }); - - Some(ProcessSinkStream { - stream: Box::pin(async_stream::stream! { - loop { - tokio::select! { - opt = rx.recv() => { - if let Some(e) = opt { - yield Err(e); - break; - } - } - res = stdout.next() => { - match res { - Some(Ok(bytes)) => yield Ok(bytes), - Some(Err(e)) => { - yield Err(e.into()); - break; - } - None => break, - } - } - } - } - - drop(stdout); - match self.child.wait().await { - Ok(status) if status.success() => return, - Ok(_) => yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()), - Err(e) => yield Err(e.into()), - } - }), - }) - } -} - -impl ProcessSink { - fn new(stdin: tokio::process::ChildStdin) -> Self { - ProcessSink { stdin } - } - - pub(crate) async fn send(&mut self, mut stream: S) -> Result<(), E> - where - S: Stream> + Unpin, - E: From, - { - while let Some(res) = stream.next().await { - let mut bytes = res?; - - self.stdin.write_all_buf(&mut bytes).await?; - } - - Ok(()) - } -} - -impl ProcessStream { - fn new(mut stdout: tokio::process::ChildStdout) -> ProcessStream { - let s = async_stream::stream! { - loop { - let mut buf = actix_web::web::BytesMut::with_capacity(65_536); - - match stdout.read_buf(&mut buf).await { - Ok(len) if len == 0 => { - break; - } - Ok(_) => { - yield Ok(buf.freeze()); - } - Err(e) => { - yield Err(e); - break; - } - } - } - }; - - ProcessStream { - stream: Box::pin(s), - } - } -} - -pub(crate) fn try_duplicate( - mut stream: S, - buffer: usize, -) -> (impl Stream>, TryDuplicateStream) -where - S: Stream> + Unpin, - T: Clone, -{ - let (tx, rx) = tokio::sync::mpsc::channel(buffer); - let s = async_stream::stream! { - while let Some(value) = stream.next().await { - match value { - Ok(t) => { - let _ = tx.send(Ok(t.clone())).await; - yield Ok(t); - } - Err(e) => yield Err(e), - } - } - }; - - ( - s, - TryDuplicateStream { - inner: ReceiverStream::new(rx), - }, - ) } impl AsyncRead for ReadAdapter @@ -312,14 +129,6 @@ where } } -impl Stream for ProcessStream { - type Item = std::io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - impl Stream for ProcessSinkStream { type Item = Result; @@ -327,19 +136,3 @@ impl Stream for ProcessSinkStream { Pin::new(&mut self.stream).poll_next(cx) } } - -impl Stream for TryDuplicateStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_next(cx) - } -} - -impl std::fmt::Display for StringError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for StringError {} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 53ada6c..f7a66fd 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -33,12 +33,22 @@ pub struct UploadManager { inner: Arc, } -pub struct Hasher { +pub struct Hasher { inner: I, - hasher: sha2::Sha256, + hasher: D, } -impl Hasher { +impl Hasher +where + D: Digest + Send + 'static, +{ + fn new(reader: I, digest: D) -> Self { + Hasher { + inner: reader, + hasher: digest, + } + } + async fn finalize_reset(self) -> Result { let mut hasher = self.hasher; let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; @@ -46,9 +56,10 @@ impl Hasher { } } -impl AsyncRead for Hasher +impl AsyncRead for Hasher where I: AsyncRead + Unpin, + D: Digest + Unpin, { fn poll_read( mut self: Pin<&mut Self>, @@ -134,25 +145,8 @@ pub(crate) struct Details { } impl Details { - pub(crate) async fn from_stream(stream: S) -> Result - where - S: futures::stream::Stream> + Unpin + 'static, - E: From + 'static, - UploadError: From, - { - let details = crate::magick::details_stream::(stream).await?; - - Ok(Details::now( - details.width, - details.height, - details.mime_type, - )) - } - - pub(crate) async fn from_async_read( - input: impl AsyncRead + Unpin + 'static, - ) -> Result { - let details = crate::magick::details_write_read(input).await?; + pub(crate) async fn from_bytes(input: web::Bytes) -> Result { + let details = crate::magick::details_bytes(input).await?; Ok(Details::now( details.width, @@ -550,25 +544,24 @@ impl UploadManager { UploadError: From, E: Unpin + 'static, { - let mapped_err_stream = Box::pin(async_stream::stream! { - use futures::stream::StreamExt; + let mut bytes_mut = actix_web::web::BytesMut::new(); - while let Some(res) = stream.next().await { - yield res.map_err(UploadError::from); - } - }); + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } - let (content_type, validated_stream) = - crate::validate::validate_image_stream(mapped_err_stream, self.inner.format.clone()) + debug!("Validating bytes"); + let (content_type, validated_reader) = + crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone()) .await?; - let (s1, s2) = crate::stream::try_duplicate(validated_stream, 1024); + let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone()); let tmpfile = crate::tmp_file(); - let (hash, _) = tokio::try_join!( - self.hash_stream::<_, UploadError>(Box::pin(s1)), - safe_save_stream::(tmpfile.clone(), Box::pin(s2)) - )?; + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; debug!("Storing alias"); self.add_existing_alias(&hash, &alias).await?; @@ -599,10 +592,7 @@ impl UploadManager { crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone()) .await?; - let mut hasher_reader = Hasher { - inner: validated_reader, - hasher: self.inner.hasher.clone(), - }; + let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone()); let tmpfile = crate::tmp_file(); safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; @@ -717,30 +707,6 @@ impl UploadManager { Ok(()) } - // produce a sh256sum of the uploaded file - async fn hash_stream(&self, mut stream: S) -> Result - where - S: futures::stream::Stream> + Unpin, - UploadError: From, - { - let mut hasher = self.inner.hasher.clone(); - - while let Some(res) = stream.next().await { - let bytes = res?; - hasher = web::block(move || { - hasher.update(&bytes); - Ok(hasher) as Result<_, UploadError> - }) - .await??; - } - - let hash = - web::block(move || Ok(hasher.finalize_reset().to_vec()) as Result<_, UploadError>) - .await??; - - Ok(Hash::new(hash)) - } - // check for an already-uploaded image with this hash, returning the path to the target file #[instrument(skip(self, hash, content_type))] async fn check_duplicate( @@ -1023,3 +989,34 @@ fn variant_details_key(hash: &[u8], path: &str) -> Vec { key.extend(b"details"); key } + +#[cfg(test)] +mod test { + use super::Hasher; + use sha2::{Digest, Sha256}; + use std::io::Read; + + #[test] + fn hasher_works() { + let hash = actix_rt::System::new() + .block_on(async move { + let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; + + let mut hasher = Hasher::new(file1, Sha256::new()); + + tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; + + hasher.finalize_reset().await + }) + .unwrap(); + + let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); + let mut vec = Vec::new(); + file.read_to_end(&mut vec).unwrap(); + let mut hasher = Sha256::new(); + hasher.update(vec); + let correct_hash = hasher.finalize_reset().to_vec(); + + assert_eq!(hash.inner, correct_hash); + } +} diff --git a/src/validate.rs b/src/validate.rs index c6ea45e..9c0896b 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -23,7 +23,7 @@ pub(crate) async fn validate_image_bytes( )), (_, ValidInputType::Mp4) => Ok(( video_mp4(), - Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?), + Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?), )), (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( mime::IMAGE_JPEG, @@ -43,50 +43,3 @@ pub(crate) async fn validate_image_bytes( )), } } - -pub(crate) async fn validate_image_stream( - stream: S, - prescribed_format: Option, -) -> Result< - ( - mime::Mime, - futures::stream::LocalBoxStream<'static, Result>, - ), - UploadError, -> -where - S: futures::stream::Stream> + Unpin + 'static, -{ - let (base_stream, copied_stream) = crate::stream::try_duplicate(stream, 1024); - - let input_type = - crate::magick::input_type_stream::<_, UploadError, UploadError>(Box::pin(base_stream)) - .await?; - - match (prescribed_format, input_type) { - (_, ValidInputType::Gif) => Ok(( - video_mp4(), - crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Gif)?, - )), - (_, ValidInputType::Mp4) => Ok(( - video_mp4(), - crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Mp4)?, - )), - (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( - mime::IMAGE_JPEG, - crate::exiftool::clear_metadata_stream(copied_stream)?, - )), - (Some(Format::Png) | None, ValidInputType::Png) => Ok(( - mime::IMAGE_PNG, - crate::exiftool::clear_metadata_stream(copied_stream)?, - )), - (Some(Format::Webp) | None, ValidInputType::Webp) => Ok(( - image_webp(), - crate::magick::clear_metadata_stream(copied_stream)?, - )), - (Some(format), _) => Ok(( - format.to_mime(), - crate::magick::convert_stream(copied_stream, format)?, - )), - } -}