From c1d4e3b87ebf3e8d8a08d552bfad68e9b67648f5 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Mon, 30 Aug 2021 21:19:47 -0500 Subject: [PATCH] Start work on piping bytes around from memory instead of going to disk and back --- Cargo.lock | 13 +++ Cargo.toml | 3 +- src/error.rs | 6 +- src/exiv2.rs | 46 -------- src/ffmpeg.rs | 90 ++++++++-------- src/magick.rs | 163 ++++++++++++++++------------ src/main.rs | 91 ++++++++++++---- src/stream.rs | 246 ++++++++++++++++++++++++++++++++++++++++++ src/upload_manager.rs | 105 +++++++++++------- src/validate.rs | 87 ++++++++------- 10 files changed, 583 insertions(+), 267 deletions(-) delete mode 100644 src/exiv2.rs create mode 100644 src/stream.rs diff --git a/Cargo.lock b/Cargo.lock index c8f3e17..31e3cb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1018,6 +1018,7 @@ dependencies = [ "thiserror", "time 0.3.2", "tokio", + "tokio-stream", "tracing", "tracing-futures", "tracing-subscriber", @@ -1600,9 +1601,21 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-stream" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 0b6bc65..5c26cd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,8 @@ sled = { version = "0.34.6" } structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } -tokio = { version = "1", default-features = false, features = ["io-util", "process", "sync"] } +tokio = { version = "1", default-features = false, features = ["io-util", "macros", "process", "sync"] } +tokio-stream = { version = "0.1", default-features = false } tracing = "0.1.15" tracing-futures = "0.2.4" tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] } diff --git a/src/error.rs b/src/error.rs index 6bcab68..7b80d52 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use crate::{exiv2::Exvi2Error, ffmpeg::VideoError, magick::MagickError}; +use crate::{ffmpeg::VideoError, magick::MagickError}; use actix_web::{http::StatusCode, HttpResponse, ResponseError}; #[derive(Debug, thiserror::Error)] @@ -69,9 +69,6 @@ pub(crate) enum UploadError { #[error("{0}")] VideoError(#[from] VideoError), - #[error("{0}")] - Exvi2Error(#[from] Exvi2Error), - #[error("{0}")] MagickError(#[from] MagickError), } @@ -107,7 +104,6 @@ impl ResponseError for UploadError { fn status_code(&self) -> StatusCode { match self { UploadError::VideoError(_) - | UploadError::Exvi2Error(_) | UploadError::MagickError(_) | UploadError::DuplicateAlias | UploadError::NoFiles diff --git a/src/exiv2.rs b/src/exiv2.rs deleted file mode 100644 index 59be3b7..0000000 --- a/src/exiv2.rs +++ /dev/null @@ -1,46 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub(crate) enum Exvi2Error { - #[error("Failed to interface with exiv2")] - IO(#[from] std::io::Error), - - #[error("Identify semaphore is closed")] - Closed, - - #[error("Exiv2 command failed")] - Status, -} - -static MAX_READS: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); - -fn semaphore() -> &'static tokio::sync::Semaphore { - MAX_READS.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get() * 5)) -} - -pub(crate) async fn clear_metadata

(file: P) -> Result<(), Exvi2Error> -where - P: AsRef, -{ - let permit = semaphore().acquire().await?; - - let status = tokio::process::Command::new("exiv2") - .arg(&"rm") - .arg(&file.as_ref()) - .spawn()? - .wait() - .await?; - - drop(permit); - - if !status.success() { - return Err(Exvi2Error::Status); - } - - Ok(()) -} - -impl From for Exvi2Error { - fn from(_: tokio::sync::AcquireError) -> Exvi2Error { - Exvi2Error::Closed - } -} diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index d080c2b..2358781 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -10,23 +10,37 @@ pub(crate) enum VideoError { Closed, } +pub(crate) enum InputFormat { + Gif, + Mp4, +} + pub(crate) enum ThumbnailFormat { Jpeg, - Webp, + // Webp, +} + +impl InputFormat { + fn as_format(&self) -> &'static str { + match self { + InputFormat::Gif => "gif_pipe", + InputFormat::Mp4 => "mp4", + } + } } impl ThumbnailFormat { fn as_codec(&self) -> &'static str { match self { ThumbnailFormat::Jpeg => "mjpeg", - ThumbnailFormat::Webp => "webp", + // ThumbnailFormat::Webp => "webp", } } fn as_format(&self) -> &'static str { match self { ThumbnailFormat::Jpeg => "singlejpeg", - ThumbnailFormat::Webp => "webp", + // ThumbnailFormat::Webp => "webp", } } } @@ -39,40 +53,34 @@ fn semaphore() -> &'static tokio::sync::Semaphore { .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) } -pub(crate) async fn to_mp4(from: P1, to: P2) -> Result<(), VideoError> +pub(crate) fn to_mp4_stream( + input: S, + input_format: InputFormat, +) -> std::io::Result>> where - P1: AsRef, - P2: AsRef, + S: futures::stream::Stream> + Unpin + 'static, + E: From + 'static, { - let permit = semaphore().acquire().await?; + let process = crate::stream::Process::spawn(tokio::process::Command::new("ffmpeg").args([ + "-f", + input_format.as_format(), + "-i", + "pipe:", + "-movflags", + "faststart+frag_keyframe+empty_moov", + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-an", + "-codec", + "h264", + "-f", + "mp4", + "pipe:", + ]))?; - let mut child = tokio::process::Command::new("ffmpeg") - .arg(&"-i") - .arg(&from.as_ref()) - .args([ - &"-movflags", - &"faststart", - &"-pix_fmt", - &"yuv420p", - &"-vf", - &"scale=trunc(iw/2)*2:trunc(ih/2)*2", - &"-an", - &"-codec", - &"h264", - &"-f", - &"mp4", - ]) - .arg(&to.as_ref()) - .spawn()?; - - let status = child.wait().await?; - drop(permit); - - if !status.success() { - return Err(VideoError::Status); - } - - Ok(()) + Ok(Box::pin(process.sink_stream(input).unwrap())) } pub(crate) async fn thumbnail( @@ -90,14 +98,12 @@ where .arg(&"-i") .arg(&from.as_ref()) .args([ - &"-ss", - &"00:00:01.000", - &"-vframes", - &"1", - &"-codec", - &format.as_codec(), - &"-f", - &format.as_format(), + "-vframes", + "1", + "-codec", + format.as_codec(), + "-f", + format.as_format(), ]) .arg(&to.as_ref()) .spawn()?; diff --git a/src/magick.rs b/src/magick.rs index c3a4baa..de9324a 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -3,9 +3,6 @@ pub(crate) enum MagickError { #[error("{0}")] IO(#[from] std::io::Error), - #[error("Magick command failed")] - Status, - #[error("Magick semaphore is closed")] Closed, @@ -31,42 +28,73 @@ static MAX_CONVERSIONS: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); fn semaphore() -> &'static tokio::sync::Semaphore { - MAX_CONVERSIONS.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().max(1) * 5)) + MAX_CONVERSIONS + .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) } -pub(crate) async fn convert_file( - from: P1, - to: P2, - format: crate::config::Format, -) -> Result<(), MagickError> +pub(crate) fn clear_metadata_stream( + input: S, +) -> std::io::Result>> where - P1: AsRef, - P2: AsRef, + S: futures::stream::Stream> + Unpin + 'static, + E: From + 'static, { - let format = format!("{}:", format.to_magick_format()); + let process = crate::stream::Process::spawn( + tokio::process::Command::new("magick").args(["convert", "-", "-strip", "-"]), + )?; - let mut output_file = std::ffi::OsString::from(format); - output_file.push(to.as_ref()); + Ok(Box::pin(process.sink_stream(input).unwrap())) +} - tracing::debug!("Outfile: {:?}", output_file); +pub(crate) fn convert_stream( + input: S, + format: crate::config::Format, +) -> std::io::Result>> +where + S: futures::stream::Stream> + Unpin + 'static, + E: From + 'static, +{ + let process = crate::stream::Process::spawn(tokio::process::Command::new("magick").args([ + "convert", + "-", + format!("{}:-", format.to_magick_format()).as_str(), + ]))?; - let permit = semaphore().acquire().await?; + Ok(Box::pin(process.sink_stream(input).unwrap())) +} - let status = tokio::process::Command::new("magick") - .arg("convert") - .arg(&from.as_ref()) - .arg(&output_file) - .spawn()? - .wait() - .await?; +pub(crate) async fn details_stream(input: S) -> Result +where + S: futures::stream::Stream> + Unpin, + E1: From, + E2: From + From + From, +{ + use futures::stream::StreamExt; + + let permit = semaphore().acquire().await.map_err(MagickError::from)?; + + let mut process = + crate::stream::Process::spawn(tokio::process::Command::new("magick").args([ + "identify", + "-ping", + "-format", + "%w %h | %m\n", + "-", + ]))?; + + process.take_sink().unwrap().send(input).await?; + let mut stream = process.take_stream().unwrap(); + + let mut buf = actix_web::web::BytesMut::new(); + while let Some(res) = stream.next().await { + let bytes = res?; + buf.extend_from_slice(&bytes); + } drop(permit); - if !status.success() { - return Err(MagickError::Status); - } - - Ok(()) + let s = String::from_utf8_lossy(&buf); + Ok(parse_details(s)?) } pub(crate) async fn details

(file: P) -> Result @@ -85,6 +113,10 @@ where let s = String::from_utf8_lossy(&output.stdout); + parse_details(s) +} + +fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { let mut lines = s.lines(); let first = lines.next().ok_or_else(|| MagickError::Format)?; @@ -127,22 +159,37 @@ where }) } -pub(crate) async fn input_type

(file: &P) -> Result +pub(crate) async fn input_type_stream(input: S) -> Result where - P: AsRef, + S: futures::stream::Stream> + Unpin, + E1: From, + E2: From + From + From, { - let permit = semaphore().acquire().await?; + use futures::stream::StreamExt; - let output = tokio::process::Command::new("magick") - .args([&"identify", &"-ping", &"-format", &"%m\n"]) - .arg(&file.as_ref()) - .output() - .await?; + let permit = semaphore().acquire().await.map_err(MagickError::from)?; + + let mut process = crate::stream::Process::spawn( + tokio::process::Command::new("magick").args(["identify", "-ping", "-format", "%m\n", "-"]), + )?; + + process.take_sink().unwrap().send(input).await?; + let mut stream = process.take_stream().unwrap(); + + let mut buf = actix_web::web::BytesMut::new(); + while let Some(res) = stream.next().await { + let bytes = res?; + buf.extend_from_slice(&bytes); + } drop(permit); - let s = String::from_utf8_lossy(&output.stdout); + let s = String::from_utf8_lossy(&buf); + Ok(parse_input_type(s)?) +} + +fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result { let mut lines = s.lines(); let first = lines.next(); @@ -161,41 +208,23 @@ where } } -pub(crate) async fn process_image( - input: P1, - output: P2, +pub(crate) fn process_image_stream( + input: S, args: Vec, format: crate::config::Format, -) -> Result<(), MagickError> +) -> std::io::Result>> where - P1: AsRef, - P2: AsRef, + S: futures::stream::Stream> + Unpin + 'static, + E: From + 'static, { - let format = format!("{}:", format.to_magick_format()); + let process = crate::stream::Process::spawn( + tokio::process::Command::new("magick") + .args([&"convert", &"-"]) + .args(args) + .arg(format!("{}:-", format.to_magick_format())), + )?; - let mut output_file = std::ffi::OsString::from(format); - output_file.push(output.as_ref()); - - tracing::debug!("Outfile: {:?}", output_file); - - let permit = semaphore().acquire().await?; - - let status = tokio::process::Command::new("magick") - .arg(&"convert") - .arg(&input.as_ref()) - .args(args) - .arg(output_file) - .spawn()? - .wait() - .await?; - - drop(permit); - - if !status.success() { - return Err(MagickError::Status); - } - - Ok(()) + Ok(Box::pin(process.sink_stream(input).unwrap())) } impl From for MagickError { diff --git a/src/main.rs b/src/main.rs index 4f483ae..f9bcbbf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,13 +15,13 @@ use tracing_subscriber::EnvFilter; mod config; mod error; -mod exiv2; mod ffmpeg; mod magick; mod middleware; mod migrate; mod processor; mod range; +mod stream; mod upload_manager; mod validate; @@ -397,30 +397,81 @@ async fn process( } } - safe_create_parent(&thumbnail_path).await?; + let stream = Box::pin(async_stream::stream! { + use futures::stream::StreamExt; - // apply chain to the provided image - let dest_file = tmp_file(); - let orig_file = tmp_file(); - actix_fs::copy(original_path, orig_file.clone()).await?; - magick::process_image(&orig_file, &dest_file, thumbnail_args, format).await?; - actix_fs::remove_file(orig_file).await?; - safe_move_file(dest_file, thumbnail_path.clone()).await?; + let mut s = actix_fs::read_to_stream(original_path.clone()) + .await? + .faster(); - let details = if let Some(details) = details { - details + while let Some(res) = s.next().await { + yield res.map_err(UploadError::from); + } + }); + + let processed_stream = crate::magick::process_image_stream(stream, thumbnail_args, format)?; + + let (base_stream, copied_stream) = crate::stream::try_duplicate(processed_stream, 1024); + + let (details, base_stream) = if let Some(details) = details { + ( + details, + Box::pin(base_stream) + as Pin< + Box< + dyn futures::stream::Stream< + Item = Result, + >, + >, + >, + ) } else { - let details = Details::from_path(&thumbnail_path).await?; - manager - .store_variant_details(thumbnail_path.clone(), name.clone(), &details) - .await?; - manager - .store_variant(thumbnail_path.clone(), name.clone()) - .await?; - details + let (base_stream, copied2) = crate::stream::try_duplicate(Box::pin(base_stream), 1024); + let details = Details::from_stream(Box::pin(base_stream)).await?; + ( + details, + Box::pin(copied2) + as Pin< + Box< + dyn futures::stream::Stream< + Item = Result, + >, + >, + >, + ) }; - return ranged_file_resp(thumbnail_path, range, details).await; + let span = tracing::Span::current(); + let details2 = details.clone(); + actix_rt::spawn(async move { + let entered = span.enter(); + if let Err(e) = + upload_manager::safe_save_stream(thumbnail_path.clone(), Box::pin(copied_stream)) + .await + { + tracing::warn!("Error saving thumbnail: {}", e); + return; + } + if let Err(e) = manager + .store_variant_details(thumbnail_path.clone(), name.clone(), &details2) + .await + { + tracing::warn!("Error saving variant details: {}", e); + return; + } + if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await { + tracing::warn!("Error saving variant info: {}", e); + } + drop(entered); + }); + + return Ok(srv_response( + HttpResponse::Ok(), + base_stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + )); } let details = if let Some(details) = details { diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..3240226 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,246 @@ +use actix_web::web::Bytes; +use futures::{ + future::FutureExt, + stream::{LocalBoxStream, Stream, StreamExt}, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub(crate) struct Process { + child: tokio::process::Child, +} + +pub(crate) struct ProcessSink { + stdin: tokio::process::ChildStdin, +} + +pub(crate) struct ProcessStream { + stream: LocalBoxStream<'static, std::io::Result>, +} + +pub(crate) struct ProcessSinkStream { + stream: LocalBoxStream<'static, Result>, +} + +pub(crate) struct TryDuplicateStream { + inner: tokio_stream::wrappers::ReceiverStream>, +} + +impl Process { + fn new(child: tokio::process::Child) -> Self { + Process { child } + } + + pub(crate) fn spawn(cmd: &mut tokio::process::Command) -> std::io::Result { + cmd.stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .spawn() + .map(Process::new) + } + + pub(crate) fn take_sink(&mut self) -> Option { + self.child.stdin.take().map(ProcessSink::new) + } + + pub(crate) fn take_stream(&mut self) -> Option { + self.child.stdout.take().map(ProcessStream::new) + } + + pub(crate) fn sink_stream(mut self, mut input_stream: S) -> Option> + where + S: Stream> + Unpin + 'static, + E: From + 'static, + { + let mut stdin = self.child.stdin.take(); + let mut stdout = self.take_stream()?; + + let s = async_stream::stream! { + let mut wait = Box::pin(self.child.wait().fuse()); + + loop { + tokio::select! { + res = input_stream.next() => { + match res { + Some(Ok(mut bytes)) => { + if let Some(stdin) = stdin.as_mut() { + let mut fut = Box::pin(stdin.write_all_buf(&mut bytes)); + + loop { + tokio::select! { + res = &mut fut => { + if let Err(e) = res { + yield Err(e.into()); + } + break; + } + res = stdout.next() => { + match res { + Some(Ok(bytes)) => yield Ok(bytes), + Some(Err(e)) => { + yield Err(e.into()); + break; + } + None => break, + } + } + res = &mut wait => { + match res { + Ok(status) if !status.success() => { + yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()); + break; + }, + Err(e) => { + yield Err(e.into()); + break; + } + _ => (), + } + } + } + } + } + }, + Some(Err(e)) => { + yield Err(e); + break; + } + None => { + stdin.take(); + }, + } + } + res = stdout.next() => { + match res { + Some(Ok(bytes)) => yield Ok(bytes), + Some(Err(e)) => { + yield Err(e.into()); + break; + } + None => break, + } + } + res = &mut wait => { + match res { + Ok(status) if !status.success() => { + yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()); + break; + }, + Err(e) => { + yield Err(e.into()); + break; + } + _ => (), + } + } + } + } + }; + + Some(ProcessSinkStream { + stream: Box::pin(s), + }) + } +} + +impl ProcessSink { + fn new(stdin: tokio::process::ChildStdin) -> Self { + ProcessSink { stdin } + } + + pub(crate) async fn send(&mut self, mut stream: S) -> Result<(), E> + where + S: Stream> + Unpin, + E: From, + { + while let Some(res) = stream.next().await { + let mut bytes = res?; + + self.stdin.write_all_buf(&mut bytes).await?; + } + + Ok(()) + } +} + +impl ProcessStream { + fn new(mut stdout: tokio::process::ChildStdout) -> ProcessStream { + let s = async_stream::stream! { + loop { + let mut buf = actix_web::web::BytesMut::with_capacity(65_536); + + match stdout.read_buf(&mut buf).await { + Ok(len) if len == 0 => { + break; + } + Ok(_) => { + yield Ok(buf.freeze()); + } + Err(e) => { + yield Err(e); + break; + } + } + } + }; + + ProcessStream { + stream: Box::pin(s), + } + } +} + +pub(crate) fn try_duplicate( + mut stream: S, + buffer: usize, +) -> (impl Stream>, TryDuplicateStream) +where + S: Stream> + Unpin, + T: Clone, +{ + let (tx, rx) = tokio::sync::mpsc::channel(buffer); + let s = async_stream::stream! { + while let Some(value) = stream.next().await { + match value { + Ok(t) => { + let _ = tx.send(Ok(t.clone())).await; + yield Ok(t); + } + Err(e) => yield Err(e), + } + } + }; + + ( + s, + TryDuplicateStream { + inner: tokio_stream::wrappers::ReceiverStream::new(rx), + }, + ) +} + +impl Stream for ProcessStream { + type Item = std::io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl Stream for ProcessSinkStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl Stream for TryDuplicateStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index dfa21d4..e4ebea6 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -3,7 +3,6 @@ use crate::{ error::UploadError, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, to_ext, - validate::validate_image, }; use actix_web::web; use futures::stream::{Stream, StreamExt, TryStreamExt}; @@ -97,6 +96,21 @@ pub(crate) struct Details { } impl Details { + pub(crate) async fn from_stream(stream: S) -> Result + where + S: futures::stream::Stream> + Unpin + 'static, + E: From + 'static, + UploadError: From, + { + let details = crate::magick::details_stream::(stream).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) + } + pub(crate) async fn from_path

(path: P) -> Result where P: AsRef, @@ -480,30 +494,31 @@ impl UploadManager { alias: String, content_type: mime::Mime, validate: bool, - stream: UploadStream, + mut stream: UploadStream, ) -> Result where UploadError: From, - E: Unpin, + E: Unpin + 'static, { - // -- READ IN BYTES FROM CLIENT -- - debug!("Reading stream"); + let mapped_err_stream = Box::pin(async_stream::stream! { + use futures::stream::StreamExt; + + while let Some(res) = stream.next().await { + yield res.map_err(UploadError::from); + } + }); + + let (content_type, validated_stream) = + crate::validate::validate_image_stream(mapped_err_stream, self.inner.format.clone()) + .await?; + + let (s1, s2) = crate::stream::try_duplicate(validated_stream, 1024); + let tmpfile = crate::tmp_file(); - safe_save_stream(tmpfile.clone(), stream).await?; - - let content_type = if validate { - debug!("Validating image"); - let format = self.inner.format.clone(); - validate_image(tmpfile.clone(), format).await? - } else { - content_type - }; - - // -- DUPLICATE CHECKS -- - - // Cloning bytes is fine because it's actually a pointer - debug!("Hashing bytes"); - let hash = self.hash(tmpfile.clone()).await?; + let (hash, _) = tokio::try_join!( + self.hash_stream::<_, UploadError>(Box::pin(s1)), + safe_save_stream::(tmpfile.clone(), Box::pin(s2)) + )?; debug!("Storing alias"); self.add_existing_alias(&hash, &alias).await?; @@ -517,26 +532,30 @@ impl UploadManager { /// Upload the file, discarding bytes if it's already present, or saving if it's new #[instrument(skip(self, stream))] - pub(crate) async fn upload(&self, stream: UploadStream) -> Result + pub(crate) async fn upload(&self, mut stream: UploadStream) -> Result where UploadError: From, - E: Unpin, + E: Unpin + 'static, { - // -- READ IN BYTES FROM CLIENT -- - debug!("Reading stream"); + let mapped_err_stream = Box::pin(async_stream::stream! { + use futures::stream::StreamExt; + + while let Some(res) = stream.next().await { + yield res.map_err(UploadError::from); + } + }); + + let (content_type, validated_stream) = + crate::validate::validate_image_stream(mapped_err_stream, self.inner.format.clone()) + .await?; + + let (s1, s2) = crate::stream::try_duplicate(validated_stream, 1024); + let tmpfile = crate::tmp_file(); - safe_save_stream(tmpfile.clone(), stream).await?; - - // -- VALIDATE IMAGE -- - debug!("Validating image"); - let format = self.inner.format.clone(); - let content_type = validate_image(tmpfile.clone(), format).await?; - - // -- DUPLICATE CHECKS -- - - // Cloning bytes is fine because it's actually a pointer - debug!("Hashing bytes"); - let hash = self.hash(tmpfile.clone()).await?; + let (hash, _) = tokio::try_join!( + self.hash_stream::<_, UploadError>(Box::pin(s1)), + safe_save_stream::(tmpfile.clone(), Box::pin(s2)) + )?; debug!("Adding alias"); let alias = self.add_alias(&hash, content_type.clone()).await?; @@ -648,12 +667,13 @@ impl UploadManager { } // produce a sh256sum of the uploaded file - async fn hash(&self, tmpfile: PathBuf) -> Result { + async fn hash_stream(&self, mut stream: S) -> Result + where + S: futures::stream::Stream> + Unpin, + UploadError: From, + { let mut hasher = self.inner.hasher.clone(); - let file = actix_fs::file::open(tmpfile).await?; - let mut stream = Box::pin(actix_fs::file::read_to_stream(file).await?.faster()); - while let Some(res) = stream.next().await { let bytes = res?; hasher = web::block(move || { @@ -861,7 +881,10 @@ impl UploadManager { } #[instrument(skip(stream))] -async fn safe_save_stream(to: PathBuf, stream: UploadStream) -> Result<(), UploadError> +pub(crate) async fn safe_save_stream( + to: PathBuf, + stream: UploadStream, +) -> Result<(), UploadError> where UploadError: From, E: Unpin, diff --git a/src/validate.rs b/src/validate.rs index f374401..6a18dc2 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -1,4 +1,4 @@ -use crate::{config::Format, error::UploadError, magick::ValidInputType, tmp_file}; +use crate::{config::Format, error::UploadError, ffmpeg::InputFormat, magick::ValidInputType}; pub(crate) fn image_webp() -> mime::Mime { "image/webp".parse().unwrap() @@ -8,52 +8,49 @@ pub(crate) fn video_mp4() -> mime::Mime { "video/mp4".parse().unwrap() } -// import & export image using the image crate -#[tracing::instrument] -pub(crate) async fn validate_image( - tmpfile: std::path::PathBuf, +pub(crate) async fn validate_image_stream( + stream: S, prescribed_format: Option, -) -> Result { - let input_type = crate::magick::input_type(&tmpfile).await?; +) -> Result< + ( + mime::Mime, + futures::stream::LocalBoxStream<'static, Result>, + ), + UploadError, +> +where + S: futures::stream::Stream> + Unpin + 'static, +{ + let (base_stream, copied_stream) = crate::stream::try_duplicate(stream, 1024); + + let input_type = + crate::magick::input_type_stream::<_, UploadError, UploadError>(Box::pin(base_stream)) + .await?; match (prescribed_format, input_type) { - (_, ValidInputType::Gif) | (_, ValidInputType::Mp4) => { - let newfile = tmp_file(); - crate::safe_create_parent(&newfile).await?; - crate::ffmpeg::to_mp4(&tmpfile, &newfile).await?; - actix_fs::rename(newfile, tmpfile).await?; - - Ok(video_mp4()) - } - (Some(Format::Jpeg), ValidInputType::Jpeg) | (None, ValidInputType::Jpeg) => { - tracing::debug!("Clearing metadata"); - crate::exiv2::clear_metadata(&tmpfile).await?; - tracing::debug!("Validated"); - - Ok(mime::IMAGE_JPEG) - } - (Some(Format::Png), ValidInputType::Png) | (None, ValidInputType::Png) => { - tracing::debug!("Clearing metadata"); - crate::exiv2::clear_metadata(&tmpfile).await?; - tracing::debug!("Validated"); - - Ok(mime::IMAGE_PNG) - } - (Some(Format::Webp), ValidInputType::Webp) | (None, ValidInputType::Webp) => { - tracing::debug!("Clearing metadata"); - crate::exiv2::clear_metadata(&tmpfile).await?; - tracing::debug!("Validated"); - - Ok(image_webp()) - } - (Some(format), _) => { - let newfile = tmp_file(); - crate::safe_create_parent(&newfile).await?; - crate::magick::convert_file(&tmpfile, &newfile, format.clone()).await?; - - actix_fs::rename(newfile, tmpfile).await?; - - Ok(format.to_mime()) - } + (_, ValidInputType::Gif) => Ok(( + video_mp4(), + crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Gif)?, + )), + (_, ValidInputType::Mp4) => Ok(( + video_mp4(), + crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Mp4)?, + )), + (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( + mime::IMAGE_JPEG, + crate::magick::clear_metadata_stream(copied_stream)?, + )), + (Some(Format::Png) | None, ValidInputType::Png) => Ok(( + mime::IMAGE_PNG, + crate::magick::clear_metadata_stream(copied_stream)?, + )), + (Some(Format::Webp) | None, ValidInputType::Webp) => Ok(( + image_webp(), + crate::magick::clear_metadata_stream(copied_stream)?, + )), + (Some(format), _) => Ok(( + format.to_mime(), + crate::magick::convert_stream(copied_stream, format)?, + )), } }