From 132e395e5c3f7503b0349b0480cbd15f3f01b1c6 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sat, 2 Apr 2022 17:40:04 -0500 Subject: [PATCH] Multiple items: - Reduce duplicate work in generate job - Use hash instead of identifier for unique processing - Move motion ident generation behind concurrent processor lock --- src/concurrent_processor.rs | 15 +++--- src/generate.rs | 93 +++++++++++++++++++++++++++++++++++++ src/ingest.rs | 6 +-- src/main.rs | 76 ++++++------------------------ src/queue.rs | 2 +- src/queue/process.rs | 28 +++++++++-- 6 files changed, 139 insertions(+), 81 deletions(-) create mode 100644 src/generate.rs diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 3db0e6b..92a8cf5 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -1,7 +1,6 @@ use crate::{ details::Details, error::{Error, UploadError}, - store::Identifier, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; @@ -42,10 +41,8 @@ impl CancelSafeProcessor where F: Future>, { - pub(super) fn new(identifier: I, path: PathBuf, fut: F) -> Result { - let id_bytes = identifier.to_bytes()?; - - let key = (id_bytes, path.clone()); + pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self { + let key = (hash.to_vec(), path.clone()); let entry = PROCESS_MAP.entry(key.clone()); @@ -54,7 +51,7 @@ where vacant.insert(Vec::new()); let span = tracing::info_span!( "Processing image", - identifier = &tracing::field::debug(&identifier), + hash = &tracing::field::debug(&hash), path = &tracing::field::debug(&path), completed = &tracing::field::Empty, ); @@ -65,21 +62,21 @@ where occupied.get_mut().push(tx); let span = tracing::info_span!( "Waiting for processed image", - identifier = &tracing::field::debug(&identifier), + hash = &tracing::field::debug(&hash), path = &tracing::field::debug(&path), ); (Some(rx), span) } }; - Ok(CancelSafeProcessor { + CancelSafeProcessor { cancel_token: CancelToken { span, key, receiver, }, fut, - }) + } } } diff --git a/src/generate.rs b/src/generate.rs new file mode 100644 index 0000000..bffac6f --- /dev/null +++ b/src/generate.rs @@ -0,0 +1,93 @@ +use crate::{ + concurrent_processor::CancelSafeProcessor, + config::ImageFormat, + details::Details, + error::Error, + ffmpeg::{InputFormat, ThumbnailFormat}, + repo::{Alias, FullRepo}, + store::Store, +}; +use actix_web::web::Bytes; +use std::path::PathBuf; +use tokio::io::AsyncReadExt; + +pub(crate) async fn generate( + repo: &R, + store: &S, + format: ImageFormat, + alias: Alias, + thumbnail_path: PathBuf, + thumbnail_args: Vec, + hash: R::Bytes, +) -> Result<(Details, Bytes), Error> { + let process_fut = process( + repo, + store, + format, + alias, + thumbnail_path.clone(), + thumbnail_args, + hash.clone(), + ); + + let (details, bytes) = + CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut).await?; + + Ok((details, bytes)) +} + +async fn process( + repo: &R, + store: &S, + format: ImageFormat, + alias: Alias, + thumbnail_path: PathBuf, + thumbnail_args: Vec, + hash: R::Bytes, +) -> Result<(Details, Bytes), Error> { + let permit = crate::PROCESS_SEMAPHORE.acquire().await?; + + let identifier = if let Some(identifier) = repo + .still_identifier_from_alias::(&alias) + .await? + { + identifier + } else { + let identifier = repo.identifier(hash.clone()).await?; + let mut reader = crate::ffmpeg::thumbnail( + store.clone(), + identifier, + InputFormat::Mp4, + ThumbnailFormat::Jpeg, + ) + .await?; + let motion_identifier = store.save_async_read(&mut reader).await?; + + repo.relate_motion_identifier(hash.clone(), &motion_identifier) + .await?; + + motion_identifier + }; + + let mut processed_reader = + crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; + + let mut vec = Vec::new(); + processed_reader.read_to_end(&mut vec).await?; + let bytes = Bytes::from(vec); + + drop(permit); + + let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; + + let identifier = store.save_bytes(bytes.clone()).await?; + repo.relate_details(&identifier, &details).await?; + repo.relate_variant_identifier( + hash, + thumbnail_path.to_string_lossy().to_string(), + &identifier, + ) + .await?; + + Ok((details, bytes)) as Result<(Details, Bytes), Error> +} diff --git a/src/ingest.rs b/src/ingest.rs index bae7bb6..613bac3 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -7,16 +7,12 @@ use crate::{ }; use actix_web::web::{Bytes, BytesMut}; use futures_util::{Stream, StreamExt}; -use once_cell::sync::Lazy; use sha2::{Digest, Sha256}; -use tokio::sync::Semaphore; use tracing::debug; mod hasher; use hasher::Hasher; -static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get())); - pub(crate) struct Session where R: FullRepo + 'static, @@ -39,7 +35,7 @@ where R: FullRepo + 'static, S: Store, { - let permit = PROCESS_SEMAPHORE.acquire().await; + let permit = crate::PROCESS_SEMAPHORE.acquire().await; let mut bytes_mut = BytesMut::new(); diff --git a/src/main.rs b/src/main.rs index 15f635a..ee3fd54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, time::{Duration, SystemTime}, }; -use tokio::{io::AsyncReadExt, sync::Semaphore}; +use tokio::sync::Semaphore; use tracing::{debug, info, instrument}; use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; @@ -30,6 +30,7 @@ mod error; mod exiftool; mod ffmpeg; mod file; +mod generate; mod ingest; mod init_tracing; mod magick; @@ -47,12 +48,10 @@ mod tmp_file; mod validate; use self::{ - concurrent_processor::CancelSafeProcessor, config::{Configuration, ImageFormat, Operation}, details::Details, either::Either, error::{Error, UploadError}, - ffmpeg::{InputFormat, ThumbnailFormat}, ingest::Session, init_tracing::init_tracing, magick::details_hint, @@ -94,6 +93,7 @@ async fn upload( .into_iter() .filter_map(|i| i.file()) .collect::>(); + for image in &images { if let Some(alias) = image.result.alias() { info!("Uploaded {} as {:?}", image.filename, alias); @@ -295,66 +295,16 @@ async fn process( return ranged_file_resp(&**store, identifier, range, details).await; } - let identifier = if let Some(identifier) = repo - .still_identifier_from_alias::(&alias) - .await? - { - identifier - } else { - let identifier = repo.identifier(hash.clone()).await?; - let permit = PROCESS_SEMAPHORE.acquire().await; - let mut reader = crate::ffmpeg::thumbnail( - (**store).clone(), - identifier, - InputFormat::Mp4, - ThumbnailFormat::Jpeg, - ) - .await?; - let motion_identifier = store.save_async_read(&mut reader).await?; - drop(permit); - - repo.relate_motion_identifier(hash.clone(), &motion_identifier) - .await?; - - motion_identifier - }; - - let thumbnail_path2 = thumbnail_path.clone(); - let identifier2 = identifier.clone(); - let process_fut = async { - let thumbnail_path = thumbnail_path2; - - let permit = PROCESS_SEMAPHORE.acquire().await?; - - let mut processed_reader = crate::magick::process_image_store_read( - (**store).clone(), - identifier2, - thumbnail_args, - format, - )?; - - let mut vec = Vec::new(); - processed_reader.read_to_end(&mut vec).await?; - let bytes = web::Bytes::from(vec); - - drop(permit); - - let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; - - let identifier = store.save_bytes(bytes.clone()).await?; - repo.relate_details(&identifier, &details).await?; - repo.relate_variant_identifier( - hash, - thumbnail_path.to_string_lossy().to_string(), - &identifier, - ) - .await?; - - Ok((details, bytes)) as Result<(Details, web::Bytes), Error> - }; - - let (details, bytes) = - CancelSafeProcessor::new(identifier, thumbnail_path.clone(), process_fut)?.await?; + let (details, bytes) = generate::generate( + &**repo, + &**store, + format, + alias, + thumbnail_path, + thumbnail_args, + hash, + ) + .await?; let (builder, stream) = if let Some(web::Header(range_header)) = range { if let Some(range) = range::single_bytes_range(&range_header) { diff --git a/src/queue.rs b/src/queue.rs index 8398f23..6c404eb 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -114,7 +114,7 @@ pub(crate) async fn process_cleanup(repo: R, store: S, wo process_jobs(&repo, &store, worker_id, cleanup::perform).await } -pub(crate) async fn process_images( +pub(crate) async fn process_images( repo: R, store: S, worker_id: String, diff --git a/src/queue/process.rs b/src/queue/process.rs index 76e4c8d..24f7e30 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -17,7 +17,7 @@ pub(super) fn perform<'a, R, S>( ) -> LocalBoxFuture<'a, Result<(), Error>> where R: FullRepo + 'static, - S: Store, + S: Store + 'static, { Box::pin(async move { match serde_json::from_slice(job) { @@ -114,7 +114,7 @@ where Ok(()) } -async fn generate( +async fn generate( repo: &R, store: &S, target_format: ImageFormat, @@ -122,5 +122,27 @@ async fn generate( process_path: PathBuf, process_args: Vec, ) -> Result<(), Error> { - unimplemented!("do this") + let hash = repo.hash(&source).await?; + + let path_string = process_path.to_string_lossy().to_string(); + let identifier_opt = repo + .variant_identifier::(hash.clone(), path_string) + .await?; + + if identifier_opt.is_some() { + return Ok(()); + } + + crate::generate::generate( + repo, + store, + target_format, + source, + process_path, + process_args, + hash, + ) + .await?; + + Ok(()) }