From 80c83eb491c0915bbcea39b6e15f77169ae10b6a Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 25 Sep 2022 20:39:09 -0500 Subject: [PATCH] Add ffprobe for details inspection - vastly improve video detection speed --- src/config/primitives.rs | 4 +- src/details.rs | 36 ++++++-- src/ffmpeg.rs | 187 ++++++++++++++++++++++++++++++--------- src/magick.rs | 19 ++-- src/process.rs | 11 ++- src/queue/process.rs | 6 +- src/validate.rs | 14 +-- 7 files changed, 206 insertions(+), 71 deletions(-) diff --git a/src/config/primitives.rs b/src/config/primitives.rs index 9a0cb6e..a0e3795 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -110,8 +110,8 @@ pub(crate) enum Store { } impl ImageFormat { - pub(crate) fn as_hint(self) -> Option { - Some(ValidInputType::from_format(self)) + pub(crate) fn as_hint(self) -> ValidInputType { + ValidInputType::from_format(self) } pub(crate) fn as_magick_format(self) -> &'static str { diff --git a/src/details.rs b/src/details.rs index d4ab6e1..91a696c 100644 --- a/src/details.rs +++ b/src/details.rs @@ -24,11 +24,18 @@ impl Details { } #[tracing::instrument("Details from bytes", skip(input))] - pub(crate) async fn from_bytes( - input: web::Bytes, - hint: Option, - ) -> Result { - let details = crate::magick::details_bytes(input, hint).await?; + pub(crate) async fn from_bytes(input: web::Bytes, hint: ValidInputType) -> Result { + let details = if hint.is_video() { + crate::ffmpeg::details_bytes(input.clone()).await? + } else { + None + }; + + let details = if let Some(details) = details { + details + } else { + crate::magick::details_bytes(input, Some(hint)).await? + }; Ok(Details::now( details.width, @@ -44,7 +51,17 @@ impl Details { identifier: S::Identifier, expected_format: Option, ) -> Result { - let details = crate::magick::details_store(store, identifier, expected_format).await?; + let details = if expected_format.map(|t| t.is_video()).unwrap_or(true) { + crate::ffmpeg::details_store(&store, &identifier).await? + } else { + None + }; + + let details = if let Some(details) = details { + details + } else { + crate::magick::details_store(store, identifier, expected_format).await? + }; Ok(Details::now( details.width, @@ -54,7 +71,12 @@ impl Details { )) } - pub(crate) fn now(width: usize, height: usize, content_type: mime::Mime, frames: Option) -> Self { + pub(crate) fn now( + width: usize, + height: usize, + content_type: mime::Mime, + frames: Option, + ) -> Self { Details { width, height, diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index ca70c4e..35a1e4b 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,8 +1,8 @@ use crate::{ error::{Error, UploadError}, + magick::{Details, ValidInputType}, process::Process, store::Store, - magick::ValidInputType, }; use actix_web::web::Bytes; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -30,11 +30,11 @@ impl InputFormat { } } - pub(crate) fn to_valid_input_type(self) -> ValidInputType { + fn to_mime(self) -> mime::Mime { match self { - Self::Gif => ValidInputType::Gif, - Self::Mp4 => ValidInputType::Mp4, - Self::Webm => ValidInputType::Webm, + Self::Gif => mime::IMAGE_GIF, + Self::Mp4 => crate::magick::video_mp4(), + Self::Webm => crate::magick::video_webm(), } } } @@ -67,9 +67,53 @@ const FORMAT_MAPPINGS: &[(&str, InputFormat)] = &[ ("webm", InputFormat::Webm), ]; -pub(crate) async fn input_type_bytes( - input: Bytes, -) -> Result, Error> { +pub(crate) async fn input_type_bytes(input: Bytes) -> Result, Error> { + if let Some(details) = details_bytes(input).await? { + return Ok(Some(details.validate_input()?)); + } + + Ok(None) +} + +pub(crate) async fn details_store( + store: &S, + identifier: &S::Identifier, +) -> Result, Error> { + let input_file = crate::tmp_file::tmp_file(None); + let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; + crate::store::file_store::safe_create_parent(&input_file).await?; + + let mut tmp_one = crate::file::File::create(&input_file).await?; + tmp_one + .write_from_stream(store.to_stream(&identifier, None, None).await?) + .await?; + tmp_one.close().await?; + + let process = Process::run( + "ffprobe", + &[ + "-v", + "quiet", + "-select_streams", + "v:0", + "-count_frames", + "-show_entries", + "stream=width,height,nb_read_frames:format=format_name", + "-of", + "default=noprint_wrappers=1:nokey=1", + input_file_str, + ], + )?; + + let mut output = Vec::new(); + process.read().read_to_end(&mut output).await?; + let output = String::from_utf8_lossy(&output); + tokio::fs::remove_file(input_file_str).await?; + + parse_details(output) +} + +pub(crate) async fn details_bytes(input: Bytes) -> Result, Error> { let input_file = crate::tmp_file::tmp_file(None); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; crate::store::file_store::safe_create_parent(&input_file).await?; @@ -78,31 +122,82 @@ pub(crate) async fn input_type_bytes( tmp_one.write_from_bytes(input).await?; tmp_one.close().await?; - let process = Process::run("ffprobe", &[ + let process = Process::run( + "ffprobe", + &[ "-v", "quiet", + "-select_streams", + "v:0", + "-count_frames", "-show_entries", - "format=format_name", + "stream=width,height,nb_read_frames:format=format_name", "-of", "default=noprint_wrappers=1:nokey=1", input_file_str, - ])?; + ], + )?; let mut output = Vec::new(); process.read().read_to_end(&mut output).await?; - let formats = String::from_utf8_lossy(&output); + let output = String::from_utf8_lossy(&output); + tokio::fs::remove_file(input_file_str).await?; - tracing::info!("FORMATS: {}", formats); + parse_details(output) +} + +fn parse_details(output: std::borrow::Cow<'_, str>) -> Result, Error> { + tracing::info!("OUTPUT: {}", output); + + let mut lines = output.lines(); + + let width = match lines.next() { + Some(line) => line, + None => return Ok(None), + }; + + let height = match lines.next() { + Some(line) => line, + None => return Ok(None), + }; + + let frames = match lines.next() { + Some(line) => line, + None => return Ok(None), + }; + + let formats = match lines.next() { + Some(line) => line, + None => return Ok(None), + }; for (k, v) in FORMAT_MAPPINGS { if formats.contains(k) { - return Ok(Some(*v)) + return Ok(Some(parse_details_inner(width, height, frames, *v)?)); } } Ok(None) } +fn parse_details_inner( + width: &str, + height: &str, + frames: &str, + format: InputFormat, +) -> Result { + let width = width.parse().map_err(|_| UploadError::UnsupportedFormat)?; + let height = height.parse().map_err(|_| UploadError::UnsupportedFormat)?; + let frames = frames.parse().map_err(|_| UploadError::UnsupportedFormat)?; + + Ok(Details { + mime_type: format.to_mime(), + width, + height, + frames: Some(frames), + }) +} + #[tracing::instrument(name = "Convert to Mp4", skip(input))] pub(crate) async fn to_mp4_bytes( input: Bytes, @@ -122,36 +217,42 @@ pub(crate) async fn to_mp4_bytes( tmp_one.close().await?; let process = if permit_audio { - Process::run("ffmpeg", &[ - "-i", - input_file_str, - "-pix_fmt", - "yuv420p", - "-vf", - "scale=trunc(iw/2)*2:trunc(ih/2)*2", - "-c:a", - "aac", - "-c:v", - "h264", - "-f", - "mp4", - output_file_str, - ])? + Process::run( + "ffmpeg", + &[ + "-i", + input_file_str, + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-c:a", + "aac", + "-c:v", + "h264", + "-f", + "mp4", + output_file_str, + ], + )? } else { - Process::run("ffmpeg", &[ - "-i", - input_file_str, - "-pix_fmt", - "yuv420p", - "-vf", - "scale=trunc(iw/2)*2:trunc(ih/2)*2", - "-an", - "-c:v", - "h264", - "-f", - "mp4", - output_file_str, - ])? + Process::run( + "ffmpeg", + &[ + "-i", + input_file_str, + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-an", + "-c:v", + "h264", + "-f", + "mp4", + output_file_str, + ], + )? }; process.wait().await?; diff --git a/src/magick.rs b/src/magick.rs index ca37b22..6f0bc1a 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -27,11 +27,11 @@ fn image_webp() -> mime::Mime { "image/webp".parse().unwrap() } -fn video_mp4() -> mime::Mime { +pub(crate) fn video_mp4() -> mime::Mime { "video/mp4".parse().unwrap() } -fn video_webm() -> mime::Mime { +pub(crate) fn video_webm() -> mime::Mime { "video/webm".parse().unwrap() } @@ -68,6 +68,13 @@ impl ValidInputType { } } + pub(crate) fn is_video(self) -> bool { + match self { + Self::Mp4 | Self::Webm | Self::Gif => true, + _ => false, + } + } + fn video_hint(self) -> Option<&'static str> { match self { Self::Mp4 => Some(".mp4"), @@ -275,11 +282,7 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { mime_type, width, height, - frames: if frames > 1 { - Some(frames) - } else { - None - }, + frames: if frames > 1 { Some(frames) } else { None }, }) } @@ -322,7 +325,7 @@ pub(crate) fn process_image_async_read( impl Details { #[instrument(name = "Validating input type")] - fn validate_input(&self) -> Result { + pub(crate) fn validate_input(&self) -> Result { if self.width > crate::CONFIG.media.max_width || self.height > crate::CONFIG.media.max_height || self.width * self.height > crate::CONFIG.media.max_area diff --git a/src/process.rs b/src/process.rs index 1a875d7..e817514 100644 --- a/src/process.rs +++ b/src/process.rs @@ -4,12 +4,12 @@ use actix_web::web::Bytes; use std::{ future::Future, pin::Pin, - process::{Stdio}, + process::Stdio, task::{Context, Poll}, }; use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, - process::{Child, Command, ChildStdin}, + process::{Child, ChildStdin, Command}, sync::oneshot::{channel, Receiver}, }; use tracing::{Instrument, Span}; @@ -83,7 +83,11 @@ impl Process { self, mut async_read: A, ) -> impl AsyncRead + Unpin { - self.read_fn(move |mut stdin| async move { tokio::io::copy(&mut async_read, &mut stdin).await.map(|_| ()) }) + self.read_fn(move |mut stdin| async move { + tokio::io::copy(&mut async_read, &mut stdin) + .await + .map(|_| ()) + }) } #[tracing::instrument] @@ -147,7 +151,6 @@ impl Process { err_closed: false, handle: DropHandle { inner: handle }, } - } } diff --git a/src/queue/process.rs b/src/queue/process.rs index 64cbfba..1ae9877 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -113,7 +113,11 @@ where result } Err(e) => { - tracing::warn!("Failed to ingest {}, {}", format!("{}", e), format!("{:?}", e)); + tracing::warn!( + "Failed to ingest {}, {}", + format!("{}", e), + format!("{:?}", e) + ); UploadResult::Failure { message: e.to_string(), diff --git a/src/validate.rs b/src/validate.rs index b51a9ca..91ecce4 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -44,11 +44,12 @@ pub(crate) async fn validate_image_bytes( enable_full_video: bool, validate: bool, ) -> Result<(ValidInputType, impl AsyncRead + Unpin), Error> { - let input_type = if let Some(input_type) = crate::ffmpeg::input_type_bytes(bytes.clone()).await? { - input_type.to_valid_input_type() - } else { - crate::magick::input_type_bytes(bytes.clone()).await? - }; + let input_type = + if let Some(input_type) = crate::ffmpeg::input_type_bytes(bytes.clone()).await? { + input_type + } else { + crate::magick::input_type_bytes(bytes.clone()).await? + }; if !validate { return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes)))); @@ -84,7 +85,8 @@ pub(crate) async fn validate_image_bytes( Ok(( ValidInputType::Mp4, Either::right(Either::left( - crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Webm, enable_full_video).await?, + crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Webm, enable_full_video) + .await?, )), )) }