diff --git a/Cargo.lock b/Cargo.lock index 880d557..fcbd116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1019,6 +1019,7 @@ dependencies = [ "time 0.3.2", "tokio", "tokio-stream", + "tokio-util", "tracing", "tracing-futures", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 5005980..1ba13d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,8 +31,9 @@ sled = { version = "0.34.6" } structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } -tokio = { version = "1", default-features = false, features = ["io-util", "macros", "process", "sync"] } +tokio = { version = "1", default-features = false, features = ["fs", "io-util", "macros", "process", "sync"] } tokio-stream = { version = "0.1", default-features = false } +tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" tracing-futures = "0.2.4" tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] } diff --git a/src/exiftool.rs b/src/exiftool.rs index f57a266..6794406 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,3 +1,21 @@ +use crate::stream::Process; +use actix_web::web::Bytes; +use tokio::{io::AsyncRead, process::Command}; + +pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { + let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; + + Ok(process.bytes_read(input).unwrap()) +} + +pub(crate) fn clear_metadata_write_read( + input: impl AsyncRead + Unpin + 'static, +) -> std::io::Result { + let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; + + Ok(process.write_read(input).unwrap()) +} + pub(crate) fn clear_metadata_stream( input: S, ) -> std::io::Result>> @@ -5,9 +23,7 @@ where S: futures::stream::Stream> + Unpin + 'static, E: From + 'static, { - let process = crate::stream::Process::spawn( - tokio::process::Command::new("exiftool").args(["-all=", "-", "-out", "-"]), - )?; + let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; Ok(Box::pin(process.sink_stream(input).unwrap())) } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 2358781..8f701d4 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,3 +1,7 @@ +use crate::stream::Process; +use actix_web::web::Bytes; +use tokio::{io::AsyncRead, process::Command}; + #[derive(Debug, thiserror::Error)] pub(crate) enum VideoError { #[error("Failed to interface with transcode process")] @@ -53,15 +57,67 @@ fn semaphore() -> &'static tokio::sync::Semaphore { .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) } +pub(crate) fn to_mp4_bytes( + input: Bytes, + input_format: InputFormat, +) -> std::io::Result { + let process = Process::spawn(Command::new("ffmpeg").args([ + "-f", + input_format.as_format(), + "-i", + "pipe:", + "-movflags", + "faststart+frag_keyframe+empty_moov", + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-an", + "-codec", + "h264", + "-f", + "mp4", + "pipe:", + ]))?; + + Ok(process.bytes_read(input).unwrap()) +} + +pub(crate) fn to_mp4_write_read( + input: impl AsyncRead + Unpin + 'static, + input_format: InputFormat, +) -> std::io::Result { + let process = Process::spawn(Command::new("ffmpeg").args([ + "-f", + input_format.as_format(), + "-i", + "pipe:", + "-movflags", + "faststart+frag_keyframe+empty_moov", + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-an", + "-codec", + "h264", + "-f", + "mp4", + "pipe:", + ]))?; + + Ok(process.write_read(input).unwrap()) +} + pub(crate) fn to_mp4_stream( input: S, input_format: InputFormat, -) -> std::io::Result>> +) -> std::io::Result>> where - S: futures::stream::Stream> + Unpin + 'static, + S: futures::stream::Stream> + Unpin + 'static, E: From + 'static, { - let process = crate::stream::Process::spawn(tokio::process::Command::new("ffmpeg").args([ + let process = Process::spawn(Command::new("ffmpeg").args([ "-f", input_format.as_format(), "-i", @@ -94,7 +150,7 @@ where { let permit = semaphore().acquire().await?; - let mut child = tokio::process::Command::new("ffmpeg") + let mut child = Command::new("ffmpeg") .arg(&"-i") .arg(&from.as_ref()) .args([ diff --git a/src/magick.rs b/src/magick.rs index de9324a..131dd5c 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -1,3 +1,11 @@ +use crate::{config::Format, stream::Process}; +use actix_web::web::Bytes; +use std::process::Stdio; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + process::Command, +}; + #[derive(Debug, thiserror::Error)] pub(crate) enum MagickError { #[error("{0}")] @@ -32,29 +40,88 @@ fn semaphore() -> &'static tokio::sync::Semaphore { .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) } +pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { + let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; + + Ok(process.bytes_read(input).unwrap()) +} + +pub(crate) fn clear_metadata_write_read( + input: impl AsyncRead + Unpin + 'static, +) -> std::io::Result { + let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; + + Ok(process.write_read(input).unwrap()) +} + +pub(crate) async fn details_write_read( + input: impl AsyncRead + Unpin + 'static, +) -> Result { + let process = Process::spawn(Command::new("magick").args([ + "identify", + "-ping", + "-format", + "%w %h | %m\n", + "-", + ]))?; + + let mut reader = process.write_read(input).unwrap(); + + let mut bytes = Vec::new(); + + reader.read_to_end(&mut bytes).await?; + + let s = String::from_utf8_lossy(&bytes); + parse_details(s) +} + +pub(crate) fn convert_write_read( + input: impl AsyncRead + Unpin + 'static, + format: Format, +) -> std::io::Result { + let process = Process::spawn(Command::new("magick").args([ + "convert", + "-", + format!("{}:-", format.to_magick_format()).as_str(), + ]))?; + + Ok(process.write_read(input).unwrap()) +} + pub(crate) fn clear_metadata_stream( input: S, -) -> std::io::Result>> +) -> std::io::Result>> where - S: futures::stream::Stream> + Unpin + 'static, + S: futures::stream::Stream> + Unpin + 'static, E: From + 'static, { - let process = crate::stream::Process::spawn( - tokio::process::Command::new("magick").args(["convert", "-", "-strip", "-"]), - )?; + let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; Ok(Box::pin(process.sink_stream(input).unwrap())) } +pub(crate) fn convert_bytes_read( + input: Bytes, + format: Format, +) -> std::io::Result { + let process = Process::spawn(Command::new("magick").args([ + "convert", + "-", + format!("{}:-", format.to_magick_format()).as_str(), + ]))?; + + Ok(process.bytes_read(input).unwrap()) +} + pub(crate) fn convert_stream( input: S, - format: crate::config::Format, -) -> std::io::Result>> + format: Format, +) -> std::io::Result>> where - S: futures::stream::Stream> + Unpin + 'static, + S: futures::stream::Stream> + Unpin + 'static, E: From + 'static, { - let process = crate::stream::Process::spawn(tokio::process::Command::new("magick").args([ + let process = Process::spawn(Command::new("magick").args([ "convert", "-", format!("{}:-", format.to_magick_format()).as_str(), @@ -65,7 +132,7 @@ where pub(crate) async fn details_stream(input: S) -> Result where - S: futures::stream::Stream> + Unpin, + S: futures::stream::Stream> + Unpin, E1: From, E2: From + From + From, { @@ -73,14 +140,13 @@ where let permit = semaphore().acquire().await.map_err(MagickError::from)?; - let mut process = - crate::stream::Process::spawn(tokio::process::Command::new("magick").args([ - "identify", - "-ping", - "-format", - "%w %h | %m\n", - "-", - ]))?; + let mut process = Process::spawn(Command::new("magick").args([ + "identify", + "-ping", + "-format", + "%w %h | %m\n", + "-", + ]))?; process.take_sink().unwrap().send(input).await?; let mut stream = process.take_stream().unwrap(); @@ -103,7 +169,7 @@ where { let permit = semaphore().acquire().await?; - let output = tokio::process::Command::new("magick") + let output = Command::new("magick") .args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"]) .arg(&file.as_ref()) .output() @@ -159,9 +225,34 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { }) } +pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result { + let permit = semaphore().acquire().await.map_err(MagickError::from)?; + + let mut child = Command::new("magick") + .args(["identify", "-ping", "-format", "%m\n", "-"]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let mut stdin = child.stdin.take().unwrap(); + let mut stdout = child.stdout.take().unwrap(); + + stdin.write_all_buf(&mut input).await?; + + let mut vec = Vec::new(); + stdout.read_to_end(&mut vec).await?; + + drop(stdin); + child.wait().await?; + drop(permit); + + let s = String::from_utf8_lossy(&vec); + parse_input_type(s) +} + pub(crate) async fn input_type_stream(input: S) -> Result where - S: futures::stream::Stream> + Unpin, + S: futures::stream::Stream> + Unpin, E1: From, E2: From + From + From, { @@ -169,9 +260,8 @@ where let permit = semaphore().acquire().await.map_err(MagickError::from)?; - let mut process = crate::stream::Process::spawn( - tokio::process::Command::new("magick").args(["identify", "-ping", "-format", "%m\n", "-"]), - )?; + let mut process = + Process::spawn(Command::new("magick").args(["identify", "-ping", "-format", "%m\n", "-"]))?; process.take_sink().unwrap().send(input).await?; let mut stream = process.take_stream().unwrap(); @@ -211,14 +301,14 @@ fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result( input: S, args: Vec, - format: crate::config::Format, -) -> std::io::Result>> + format: Format, +) -> std::io::Result>> where - S: futures::stream::Stream> + Unpin + 'static, + S: futures::stream::Stream> + Unpin + 'static, E: From + 'static, { - let process = crate::stream::Process::spawn( - tokio::process::Command::new("magick") + let process = Process::spawn( + Command::new("magick") .args([&"convert", &"-"]) .args(args) .arg(format!("{}:-", format.to_magick_format())), diff --git a/src/stream.rs b/src/stream.rs index 5afc86f..185a2fb 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,15 +1,27 @@ use actix_web::web::Bytes; use futures::stream::{LocalBoxStream, Stream, StreamExt}; use std::{ + future::Future, pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf}; +use tokio_stream::wrappers::ReceiverStream; + +pub(crate) struct ReadAdapter { + inner: S, +} pub(crate) struct Process { child: tokio::process::Child, } +pub(crate) struct ProcessRead { + inner: I, + err_recv: tokio::sync::oneshot::Receiver, + err_closed: bool, +} + pub(crate) struct ProcessSink { stdin: tokio::process::ChildStdin, } @@ -23,7 +35,44 @@ pub(crate) struct ProcessSinkStream { } pub(crate) struct TryDuplicateStream { - inner: tokio_stream::wrappers::ReceiverStream>, + inner: ReceiverStream>, +} + +#[derive(Debug)] +pub(crate) struct StringError(String); + +impl ReadAdapter { + pub(crate) fn new_unsync( + mut stream: S, + ) -> ReadAdapter>> + where + S: Stream> + Unpin + 'static, + E: std::fmt::Display, + { + let (tx, rx) = tokio::sync::mpsc::channel(1); + + actix_rt::spawn(async move { + while let Some(res) = stream.next().await { + if tx + .send(res.map_err(|e| StringError(e.to_string()))) + .await + .is_err() + { + break; + } + } + }); + + ReadAdapter::new_sync(ReceiverStream::new(rx)) + } + + fn new_sync(stream: S) -> Self + where + S: Stream> + Unpin, + E: Into>, + { + ReadAdapter { inner: stream } + } } impl Process { @@ -46,6 +95,47 @@ impl Process { self.child.stdout.take().map(ProcessStream::new) } + pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option { + let mut stdin = self.child.stdin.take()?; + let stdout = self.child.stdout.take()?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + actix_rt::spawn(async move { + if let Err(e) = stdin.write_all_buf(&mut input).await { + let _ = tx.send(e); + } + }); + + Some(Box::pin(ProcessRead { + inner: stdout, + err_recv: rx, + err_closed: false, + })) + } + + pub(crate) fn write_read( + mut self, + mut input_reader: impl AsyncRead + Unpin + 'static, + ) -> Option { + let mut stdin = self.child.stdin.take()?; + let stdout = self.child.stdout.take()?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + actix_rt::spawn(async move { + if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { + let _ = tx.send(e); + } + }); + + Some(Box::pin(ProcessRead { + inner: stdout, + err_recv: rx, + err_closed: false, + })) + } + pub(crate) fn sink_stream(mut self, input_stream: S) -> Option> where S: Stream> + Unpin + 'static, @@ -167,11 +257,61 @@ where ( s, TryDuplicateStream { - inner: tokio_stream::wrappers::ReceiverStream::new(rx), + inner: ReceiverStream::new(rx), }, ) } +impl AsyncRead for ReadAdapter +where + S: Stream> + Unpin, + E: Into>, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(Ok(bytes))) => { + buf.put_slice(&bytes); + Poll::Ready(Ok(())) + } + Poll::Ready(None) => Poll::Ready(Ok(())), + Poll::Ready(Some(Err(e))) => { + Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl AsyncRead for ProcessRead +where + I: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if !self.err_closed { + if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { + self.err_closed = true; + if let Ok(err) = res { + return Poll::Ready(Err(err)); + } + } + } + + if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { + return Poll::Ready(res); + } + + Poll::Pending + } +} + impl Stream for ProcessStream { type Item = std::io::Result; @@ -195,3 +335,11 @@ impl Stream for TryDuplicateStream { Pin::new(&mut self.inner).poll_next(cx) } } + +impl std::fmt::Display for StringError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for StringError {} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index e4ebea6..53ada6c 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -7,7 +7,13 @@ use crate::{ use actix_web::web; use futures::stream::{Stream, StreamExt, TryStreamExt}; use sha2::Digest; -use std::{path::PathBuf, pin::Pin, sync::Arc}; +use std::{ + path::PathBuf, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, ReadBuf}; use tracing::{debug, error, info, instrument, warn, Span}; // TREE STRUCTURE @@ -27,6 +33,38 @@ pub struct UploadManager { inner: Arc, } +pub struct Hasher { + inner: I, + hasher: sha2::Sha256, +} + +impl Hasher { + async fn finalize_reset(self) -> Result { + let mut hasher = self.hasher; + let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; + Ok(hash) + } +} + +impl AsyncRead for Hasher +where + I: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let before_len = buf.filled().len(); + let poll_res = Pin::new(&mut self.inner).poll_read(cx, buf); + let after_len = buf.filled().len(); + if after_len > before_len { + self.hasher.update(&buf.filled()[before_len..after_len]); + } + poll_res + } +} + struct UploadManagerInner { format: Option, hasher: sha2::Sha256, @@ -111,6 +149,18 @@ impl Details { )) } + pub(crate) async fn from_async_read( + input: impl AsyncRead + Unpin + 'static, + ) -> Result { + let details = crate::magick::details_write_read(input).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) + } + pub(crate) async fn from_path

(path: P) -> Result where P: AsRef, @@ -535,27 +585,28 @@ impl UploadManager { pub(crate) async fn upload(&self, mut stream: UploadStream) -> Result where UploadError: From, - E: Unpin + 'static, { - let mapped_err_stream = Box::pin(async_stream::stream! { - use futures::stream::StreamExt; + let mut bytes_mut = actix_web::web::BytesMut::new(); - while let Some(res) = stream.next().await { - yield res.map_err(UploadError::from); - } - }); + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } - let (content_type, validated_stream) = - crate::validate::validate_image_stream(mapped_err_stream, self.inner.format.clone()) + debug!("Validating bytes"); + let (content_type, validated_reader) = + crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone()) .await?; - let (s1, s2) = crate::stream::try_duplicate(validated_stream, 1024); + let mut hasher_reader = Hasher { + inner: validated_reader, + hasher: self.inner.hasher.clone(), + }; let tmpfile = crate::tmp_file(); - let (hash, _) = tokio::try_join!( - self.hash_stream::<_, UploadError>(Box::pin(s1)), - safe_save_stream::(tmpfile.clone(), Box::pin(s2)) - )?; + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; debug!("Adding alias"); let alias = self.add_alias(&hash, content_type.clone()).await?; @@ -880,6 +931,34 @@ impl UploadManager { } } +#[instrument(skip(input))] +pub(crate) async fn safe_save_reader( + to: PathBuf, + input: &mut (impl AsyncRead + Unpin), +) -> Result<(), UploadError> { + if let Some(path) = to.parent() { + debug!("Creating directory {:?}", path); + actix_fs::create_dir_all(path.to_owned()).await?; + } + + debug!("Checking if {:?} already exists", to); + if let Err(e) = actix_fs::metadata(to.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { + return Err(e.into()); + } + } else { + return Err(UploadError::FileExists); + } + + debug!("Writing stream to {:?}", to); + + let mut file = tokio::fs::File::create(to).await?; + + tokio::io::copy(input, &mut file).await?; + + Ok(()) +} + #[instrument(skip(stream))] pub(crate) async fn safe_save_stream( to: PathBuf, diff --git a/src/validate.rs b/src/validate.rs index d97b0b0..c6ea45e 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -1,4 +1,6 @@ use crate::{config::Format, error::UploadError, ffmpeg::InputFormat, magick::ValidInputType}; +use actix_web::web::Bytes; +use tokio::io::AsyncRead; pub(crate) fn image_webp() -> mime::Mime { "image/webp".parse().unwrap() @@ -8,6 +10,40 @@ pub(crate) fn video_mp4() -> mime::Mime { "video/mp4".parse().unwrap() } +pub(crate) async fn validate_image_bytes( + bytes: Bytes, + prescribed_format: Option, +) -> Result<(mime::Mime, Box), UploadError> { + let input_type = crate::magick::input_type_bytes(bytes.clone()).await?; + + match (prescribed_format, input_type) { + (_, ValidInputType::Gif) => Ok(( + video_mp4(), + Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?), + )), + (_, ValidInputType::Mp4) => Ok(( + video_mp4(), + Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?), + )), + (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( + mime::IMAGE_JPEG, + Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?), + )), + (Some(Format::Png) | None, ValidInputType::Png) => Ok(( + mime::IMAGE_PNG, + Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?), + )), + (Some(Format::Webp) | None, ValidInputType::Webp) => Ok(( + image_webp(), + Box::new(crate::magick::clear_metadata_bytes_read(bytes)?), + )), + (Some(format), _) => Ok(( + format.to_mime(), + Box::new(crate::magick::convert_bytes_read(bytes, format)?), + )), + } +} + pub(crate) async fn validate_image_stream( stream: S, prescribed_format: Option,