From 26a24010279b004e3c4a78a126e8e0f5bf778bfe Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 23 Oct 2021 14:14:12 -0500 Subject: [PATCH] Use tmp files for mp4s --- src/ffmpeg.rs | 87 ++++++++++++++++++++++------- src/{store/file_store => }/file.rs | 74 +++++++++++++++++++++++- src/magick.rs | 90 +++++++++++++++++++++++------- src/main.rs | 4 ++ src/process.rs | 41 ++++++++++++++ src/store/file_store.rs | 4 +- src/tmp_file.rs | 58 +++++++++++++++++++ src/upload_manager/mod.rs | 5 +- src/validate.rs | 14 ++--- 9 files changed, 323 insertions(+), 54 deletions(-) rename src/{store/file_store => }/file.rs (85%) create mode 100644 src/tmp_file.rs diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 72da431..ec226ac 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,4 +1,8 @@ -use crate::{error::Error, process::Process, store::Store}; +use crate::{ + error::{Error, UploadError}, + process::Process, + store::Store, +}; use actix_web::web::Bytes; use tokio::io::AsyncRead; use tracing::instrument; @@ -16,10 +20,10 @@ pub(crate) enum ThumbnailFormat { } impl InputFormat { - fn as_format(&self) -> &'static str { + fn to_ext(&self) -> &'static str { match self { - InputFormat::Gif => "gif_pipe", - InputFormat::Mp4 => "mp4", + InputFormat::Gif => ".gif", + InputFormat::Mp4 => ".mp4", } } } @@ -32,6 +36,12 @@ impl ThumbnailFormat { } } + fn to_ext(&self) -> &'static str { + match self { + ThumbnailFormat::Jpeg => ".jpeg", + } + } + fn as_format(&self) -> &'static str { match self { ThumbnailFormat::Jpeg => "singlejpeg", @@ -40,19 +50,27 @@ impl ThumbnailFormat { } } -pub(crate) fn to_mp4_bytes( +pub(crate) async fn to_mp4_bytes( input: Bytes, input_format: InputFormat, -) -> std::io::Result { +) -> Result { + let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext())); + let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&input_file).await?; + + let output_file = crate::tmp_file::tmp_file(Some(".mp4")); + let output_file_str = output_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&output_file).await?; + + let mut tmp_one = crate::file::File::create(&input_file).await?; + tmp_one.write_from_bytes(input).await?; + tmp_one.close().await?; + let process = Process::run( "ffmpeg", &[ - "-f", - input_format.as_format(), "-i", - "pipe:", - "-movflags", - "faststart+frag_keyframe+empty_moov", + &input_file_str, "-pix_fmt", "yuv420p", "-vf", @@ -62,11 +80,19 @@ pub(crate) fn to_mp4_bytes( "h264", "-f", "mp4", - "pipe:", + &output_file_str, ], )?; - Ok(process.bytes_read(input).unwrap()) + process.wait().await?; + tokio::fs::remove_file(input_file).await?; + + let tmp_two = crate::file::File::open(&output_file).await?; + let stream = tmp_two.read_to_stream(None, None).await?; + let reader = tokio_util::io::StreamReader::new(stream); + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); + + Ok(Box::pin(clean_reader)) } #[instrument(name = "Create video thumbnail")] @@ -75,23 +101,46 @@ pub(crate) async fn thumbnail( from: S::Identifier, input_format: InputFormat, format: ThumbnailFormat, -) -> Result { +) -> Result +where + Error: From, +{ + let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext())); + let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&input_file).await?; + + let output_file = crate::tmp_file::tmp_file(Some(format.to_ext())); + let output_file_str = output_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&output_file).await?; + + let mut tmp_one = crate::file::File::create(&input_file).await?; + tmp_one + .write_from_stream(store.to_stream(&from, None, None).await?) + .await?; + tmp_one.close().await?; + let process = Process::run( "ffmpeg", &[ - "-f", - input_format.as_format(), "-i", - "pipe:", + &input_file_str, "-vframes", "1", "-codec", format.as_codec(), "-f", format.as_format(), - "pipe:", + &output_file_str, ], )?; - Ok(process.store_read(store, from).unwrap()) + process.wait().await?; + tokio::fs::remove_file(input_file).await?; + + let tmp_two = crate::file::File::open(&output_file).await?; + let stream = tmp_two.read_to_stream(None, None).await?; + let reader = tokio_util::io::StreamReader::new(stream); + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); + + Ok(Box::pin(clean_reader)) } diff --git a/src/store/file_store/file.rs b/src/file.rs similarity index 85% rename from src/store/file_store/file.rs rename to src/file.rs index cf5accf..a51344b 100644 --- a/src/store/file_store/file.rs +++ b/src/file.rs @@ -8,8 +8,8 @@ pub(crate) use tokio_file::File; mod tokio_file { use crate::{store::file_store::FileError, Either}; use actix_web::web::{Bytes, BytesMut}; - use futures_util::stream::Stream; - use std::{io::SeekFrom, path::Path}; + use futures_util::stream::{Stream, StreamExt}; + use std::{io::SeekFrom, path::Path, pin::Pin}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -35,6 +35,22 @@ mod tokio_file { Ok(()) } + pub(crate) async fn write_from_stream(&mut self, mut stream: S) -> std::io::Result<()> + where + S: Stream>, + { + // SAFETY: pinned stream shadows original stream so it cannot be moved + let mut stream = unsafe { Pin::new_unchecked(&mut stream) }; + + while let Some(res) = stream.next().await { + let mut bytes = res?; + + self.inner.write_all_buf(&mut bytes).await?; + } + + Ok(()) + } + pub(crate) async fn write_from_async_read( &mut self, mut reader: R, @@ -46,6 +62,10 @@ mod tokio_file { Ok(()) } + pub(crate) async fn close(self) -> std::io::Result<()> { + Ok(()) + } + pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> where W: AsyncWrite + Unpin + ?Sized, @@ -112,7 +132,7 @@ mod tokio_file { mod io_uring { use crate::store::file_store::FileError; use actix_web::web::Bytes; - use futures_util::stream::Stream; + use futures_util::stream::{Stream, StreamExt}; use std::{ convert::TryInto, fs::Metadata, @@ -182,6 +202,50 @@ mod io_uring { Ok(()) } + pub(crate) async fn write_from_stream(&mut self, mut stream: S) -> std::io::Result<()> + where + S: Stream>, + { + // SAFETY: pinned stream shadows original stream so it cannot be moved + let mut stream = unsafe { Pin::new_unchecked(&mut stream) }; + let mut cursor: u64 = 0; + + while let Some(res) = stream.next().await { + let bytes = res?; + let mut buf = bytes.to_vec(); + + let len = buf.len(); + let mut position = 0; + + loop { + if position == len { + break; + } + + let position_u64: u64 = position.try_into().unwrap(); + let (res, slice) = self + .write_at(buf.slice(position..len), cursor + position_u64) + .await; + + let n = res?; + if n == 0 { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + } + + position += n; + + buf = slice.into_inner(); + } + + let position: u64 = position.try_into().unwrap(); + cursor += position; + } + + self.inner.sync_all().await?; + + Ok(()) + } + pub(crate) async fn write_from_async_read( &mut self, mut reader: R, @@ -232,6 +296,10 @@ mod io_uring { Ok(()) } + pub(crate) async fn close(self) -> std::io::Result<()> { + self.inner.close().await + } + pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> where W: AsyncWrite + Unpin + ?Sized, diff --git a/src/magick.rs b/src/magick.rs index e368644..e015f3f 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -57,6 +57,10 @@ impl ValidInputType { } } + fn is_mp4(&self) -> bool { + matches!(self, Self::Mp4) + } + pub(crate) fn from_format(format: Format) -> Self { match format { Format::Jpeg => ValidInputType::Jpeg, @@ -79,11 +83,40 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result std::io::Result { + let process = Process::run( + "magick", + &[ + "convert", + "-", + "-strip", + format!("{}:-", format.to_magick_format()).as_str(), + ], + )?; + + Ok(process.bytes_read(input).unwrap()) +} + #[instrument(name = "Getting details from input bytes", skip(input))] pub(crate) async fn details_bytes( input: Bytes, hint: Option, ) -> Result { + if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) { + let input_file = crate::tmp_file::tmp_file(Some(".mp4")); + let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&input_file).await?; + + let mut tmp_one = crate::file::File::create(&input_file).await?; + tmp_one.write_from_bytes(input).await?; + tmp_one.close().await?; + + return details_file(input_file_str).await; + } + let last_arg = if let Some(expected_format) = hint { format!("{}:-", expected_format.to_str()) } else { @@ -104,29 +137,29 @@ pub(crate) async fn details_bytes( parse_details(s) } -pub(crate) fn convert_bytes_read( - input: Bytes, - format: Format, -) -> std::io::Result { - let process = Process::run( - "magick", - &[ - "convert", - "-", - "-strip", - format!("{}:-", format.to_magick_format()).as_str(), - ], - )?; - - Ok(process.bytes_read(input).unwrap()) -} - pub(crate) async fn details_store( store: S, identifier: S::Identifier, - expected_format: Option, -) -> Result { - let last_arg = if let Some(expected_format) = expected_format { + hint: Option, +) -> Result +where + Error: From, +{ + if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) { + let input_file = crate::tmp_file::tmp_file(Some(".mp4")); + let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&input_file).await?; + + let mut tmp_one = crate::file::File::create(&input_file).await?; + tmp_one + .write_from_stream(store.to_stream(&identifier, None, None).await?) + .await?; + tmp_one.close().await?; + + return details_file(input_file_str).await; + } + + let last_arg = if let Some(expected_format) = hint { format!("{}:-", expected_format.to_str()) } else { "-".to_owned() @@ -147,6 +180,23 @@ pub(crate) async fn details_store( parse_details(s) } +pub(crate) async fn details_file(path_str: &str) -> Result { + let process = Process::run( + "magick", + &["identify", "-ping", "-format", "%w %h | %m\n", &path_str], + )?; + + let mut reader = process.read().unwrap(); + + let mut output = Vec::new(); + reader.read_to_end(&mut output).await?; + tokio::fs::remove_file(path_str).await?; + + let s = String::from_utf8_lossy(&output); + + parse_details(s) +} + fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { let mut lines = s.lines(); let first = lines.next().ok_or(UploadError::UnsupportedFormat)?; diff --git a/src/main.rs b/src/main.rs index 875a485..71e7ebf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,7 @@ mod either; mod error; mod exiftool; mod ffmpeg; +mod file; mod init_tracing; mod magick; mod map_error; @@ -37,6 +38,7 @@ mod process; mod processor; mod range; mod store; +mod tmp_file; mod upload_manager; mod validate; @@ -821,6 +823,8 @@ where .run() .await?; + crate::tmp_file::remove_tmp_dir().await?; + Ok(()) } diff --git a/src/process.rs b/src/process.rs index e616f87..057d809 100644 --- a/src/process.rs +++ b/src/process.rs @@ -63,6 +63,14 @@ impl Process { cmd.spawn().map(|child| Process { child, span }) } + pub(crate) async fn wait(mut self) -> std::io::Result<()> { + let status = self.child.wait().await?; + if !status.success() { + return Err(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + Ok(()) + } + pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; @@ -103,6 +111,39 @@ impl Process { }) } + pub(crate) fn read(mut self) -> Option { + let stdout = self.child.stdout.take()?; + + let (tx, rx) = channel(); + + let span = self.spawn_span(); + let mut child = self.child; + let handle = actix_rt::spawn( + async move { + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx + .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + } + Err(e) => { + let _ = tx.send(e); + } + } + } + .instrument(span), + ); + + Some(ProcessRead { + inner: stdout, + span: self.span, + err_recv: rx, + err_closed: false, + handle: DropHandle { inner: handle }, + }) + } + pub(crate) fn store_read( mut self, store: S, diff --git a/src/store/file_store.rs b/src/store/file_store.rs index e13740b..7f5ba4b 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,4 +1,4 @@ -use crate::store::Store; +use crate::{file::File, store::Store}; use actix_web::web::Bytes; use futures_util::stream::Stream; use std::{ @@ -10,10 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, error, instrument}; use uuid::Uuid; -mod file; mod file_id; mod restructure; -use file::File; pub(crate) use file_id::FileId; // - Settings Tree diff --git a/src/tmp_file.rs b/src/tmp_file.rs new file mode 100644 index 0000000..8c99d41 --- /dev/null +++ b/src/tmp_file.rs @@ -0,0 +1,58 @@ +use once_cell::sync::Lazy; +use std::path::PathBuf; +use tokio::io::AsyncRead; +use uuid::Uuid; + +static TMP_DIR: Lazy = Lazy::new(|| { + let dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); + std::fs::create_dir(&dir).unwrap(); + dir +}); + +struct TmpFile(PathBuf); + +impl Drop for TmpFile { + fn drop(&mut self) { + actix_rt::spawn(tokio::fs::remove_file(self.0.clone())); + } +} + +pin_project_lite::pin_project! { + pub(crate) struct TmpFileCleanup { + #[pin] + inner: R, + + file: TmpFile, + } +} + +pub(crate) fn tmp_file(ext: Option<&str>) -> PathBuf { + if let Some(ext) = ext { + TMP_DIR.join(format!("{}{}", Uuid::new_v4(), ext)) + } else { + TMP_DIR.join(Uuid::new_v4().to_string()) + } +} + +pub(crate) async fn remove_tmp_dir() -> std::io::Result<()> { + tokio::fs::remove_dir_all(&*TMP_DIR).await +} + +pub(crate) fn cleanup_tmpfile(reader: R, file: PathBuf) -> TmpFileCleanup { + TmpFileCleanup { + inner: reader, + file: TmpFile(file), + } +} + +impl AsyncRead for TmpFileCleanup { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.as_mut().project(); + + this.inner.poll_read(cx, buf) + } +} diff --git a/src/upload_manager/mod.rs b/src/upload_manager/mod.rs index b541e3a..24757bc 100644 --- a/src/upload_manager/mod.rs +++ b/src/upload_manager/mod.rs @@ -585,7 +585,10 @@ impl Details { store: S, identifier: S::Identifier, expected_format: Option, - ) -> Result { + ) -> Result + where + Error: From, + { let details = crate::magick::details_store(store, identifier, expected_format).await?; Ok(Details::now( diff --git a/src/validate.rs b/src/validate.rs index fe77aad..f04c750 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -47,17 +47,15 @@ pub(crate) async fn validate_image_bytes( match (prescribed_format, input_type) { (_, ValidInputType::Gif) => Ok(( ValidInputType::Mp4, - Either::right(Either::left(crate::ffmpeg::to_mp4_bytes( - bytes, - InputFormat::Gif, - )?)), + Either::right(Either::left( + crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif).await?, + )), )), (_, ValidInputType::Mp4) => Ok(( ValidInputType::Mp4, - Either::right(Either::left(crate::ffmpeg::to_mp4_bytes( - bytes, - InputFormat::Mp4, - )?)), + Either::right(Either::left( + crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4).await?, + )), )), (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( ValidInputType::Jpeg,