From ac48003f4518da6e68e62b05ade636527260da3a Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 22 Jul 2023 11:15:30 -0500 Subject: [PATCH] Extract ProcessMap out of static --- src/concurrent_processor.rs | 132 +++++++++++++++++++++--------------- src/generate.rs | 8 ++- src/lib.rs | 25 +++++-- src/queue.rs | 66 +++++++++++++++++- src/queue/process.rs | 6 ++ 5 files changed, 175 insertions(+), 62 deletions(-) diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 52c4b62..ca54d7d 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -5,11 +5,11 @@ use crate::{ use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; use flume::{r#async::RecvFut, Receiver, Sender}; -use once_cell::sync::Lazy; use std::{ future::Future, path::PathBuf, pin::Pin, + sync::Arc, task::{Context, Poll}, }; use tracing::Span; @@ -18,14 +18,81 @@ type OutcomeReceiver = Receiver<(Details, web::Bytes)>; type ProcessMapKey = (Vec, PathBuf); -type ProcessMap = DashMap; +type ProcessMapInner = DashMap; -static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); +#[derive(Debug, Default, Clone)] +pub(crate) struct ProcessMap { + process_map: Arc, +} + +impl ProcessMap { + pub(super) fn new() -> Self { + Self::default() + } + + pub(super) async fn process( + &self, + hash: &[u8], + path: PathBuf, + fut: Fut, + ) -> Result<(Details, web::Bytes), Error> + where + Fut: Future>, + { + let key = (hash.to_vec(), path.clone()); + + let (sender, receiver) = flume::bounded(1); + + let entry = self.process_map.entry(key.clone()); + + let (state, span) = match entry { + Entry::Vacant(vacant) => { + vacant.insert(receiver); + + let span = tracing::info_span!( + "Processing image", + hash = &tracing::field::debug(&hex::encode(hash)), + path = &tracing::field::debug(&path), + completed = &tracing::field::Empty, + ); + + (CancelState::Sender { sender }, span) + } + Entry::Occupied(receiver) => { + let span = tracing::info_span!( + "Waiting for processed image", + hash = &tracing::field::debug(&hex::encode(hash)), + path = &tracing::field::debug(&path), + ); + + let receiver = receiver.get().clone().into_recv_async(); + + (CancelState::Receiver { receiver }, span) + } + }; + + CancelSafeProcessor { + cancel_token: CancelToken { + span, + key, + state, + process_map: self.clone(), + }, + fut, + } + .await + } + + fn remove(&self, key: &ProcessMapKey) -> Option { + self.process_map.remove(key).map(|(_, v)| v) + } +} struct CancelToken { span: Span, key: ProcessMapKey, state: CancelState, + process_map: ProcessMap, } enum CancelState { @@ -44,7 +111,7 @@ impl CancelState { } pin_project_lite::pin_project! { - pub(super) struct CancelSafeProcessor { + struct CancelSafeProcessor { cancel_token: CancelToken, #[pin] @@ -52,50 +119,6 @@ pin_project_lite::pin_project! { } } -impl CancelSafeProcessor -where - F: Future>, -{ - pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self { - let key = (hash.to_vec(), path.clone()); - - let (sender, receiver) = flume::bounded(1); - - let entry = PROCESS_MAP.entry(key.clone()); - - let (state, span) = match entry { - Entry::Vacant(vacant) => { - vacant.insert(receiver); - let span = tracing::info_span!( - "Processing image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), - completed = &tracing::field::Empty, - ); - (CancelState::Sender { sender }, span) - } - Entry::Occupied(receiver) => { - let span = tracing::info_span!( - "Waiting for processed image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), - ); - ( - CancelState::Receiver { - receiver: receiver.get().clone().into_recv_async(), - }, - span, - ) - } - }; - - CancelSafeProcessor { - cancel_token: CancelToken { span, key, state }, - fut, - } - } -} - impl Future for CancelSafeProcessor where F: Future>, @@ -106,20 +129,23 @@ where let this = self.as_mut().project(); let span = &this.cancel_token.span; + let process_map = &this.cancel_token.process_map; let state = &mut this.cancel_token.state; let key = &this.cancel_token.key; let fut = this.fut; span.in_scope(|| match state { - CancelState::Sender { sender } => fut.poll(cx).map(|res| { - PROCESS_MAP.remove(key); + CancelState::Sender { sender } => { + let res = std::task::ready!(fut.poll(cx)); + + process_map.remove(key); if let Ok(tup) = &res { let _ = sender.try_send(tup.clone()); } - res - }), + Poll::Ready(res) + } CancelState::Receiver { ref mut receiver } => Pin::new(receiver) .poll(cx) .map(|res| res.map_err(|_| UploadError::Canceled.into())), @@ -130,7 +156,7 @@ where impl Drop for CancelToken { fn drop(&mut self) { if self.state.is_sender() { - let completed = PROCESS_MAP.remove(&self.key).is_none(); + let completed = self.process_map.remove(&self.key).is_none(); self.span.record("completed", completed); } } diff --git a/src/generate.rs b/src/generate.rs index 4980911..2771c6b 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -1,5 +1,5 @@ use crate::{ - concurrent_processor::CancelSafeProcessor, + concurrent_processor::ProcessMap, details::Details, error::{Error, UploadError}, ffmpeg::ThumbnailFormat, @@ -17,6 +17,7 @@ use tracing::Instrument; pub(crate) async fn generate( repo: &R, store: &S, + process_map: &ProcessMap, format: InputProcessableFormat, alias: Alias, thumbnail_path: PathBuf, @@ -39,8 +40,9 @@ pub(crate) async fn generate( hash.clone(), ); - let (details, bytes) = - CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut).await?; + let (details, bytes) = process_map + .process(hash.as_ref(), thumbnail_path, process_fut) + .await?; Ok((details, bytes)) } diff --git a/src/lib.rs b/src/lib.rs index dbf6a2c..6255568 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ use actix_web::{ http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; +use concurrent_processor::ProcessMap; use formats::InputProcessableFormat; use futures_util::{ stream::{empty, once}, @@ -661,6 +662,7 @@ async fn process( ext: web::Path, repo: web::Data, store: web::Data, + process_map: web::Data, ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; @@ -723,6 +725,7 @@ async fn process( let (details, bytes) = generate::generate( &repo, &store, + &process_map, format, alias, thumbnail_path, @@ -1285,7 +1288,7 @@ fn configure_endpoints< ); } -fn spawn_workers(repo: R, store: S) +fn spawn_workers(repo: R, store: S, process_map: ProcessMap) where R: FullRepo + 'static, S: Store + 'static, @@ -1297,8 +1300,14 @@ where next_worker_id(), )) }); - tracing::trace_span!(parent: None, "Spawn task") - .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id()))); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_images( + repo, + store, + process_map, + next_worker_id(), + )) + }); } async fn launch_file_store( @@ -1307,6 +1316,8 @@ async fn launch_file_store std::io::Result<()> { + let process_map = ProcessMap::new(); + HttpServer::new(move || { let client = client.clone(); @@ -1314,11 +1325,12 @@ async fn launch_file_store std::io::Result<()> { + let process_map = ProcessMap::new(); + HttpServer::new(move || { let client = client.clone(); @@ -1342,11 +1356,12 @@ async fn launch_object_store< let repo = repo.clone(); let extra_config = extra_config.clone(); - spawn_workers(repo.clone(), store.clone()); + spawn_workers(repo.clone(), store.clone(), process_map.clone()); App::new() .wrap(TracingLogger::default()) .wrap(Deadline) + .app_data(web::Data::new(process_map.clone())) .configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) }) .bind(CONFIG.server.address)? diff --git a/src/queue.rs b/src/queue.rs index 6605645..8afee8d 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,4 +1,5 @@ use crate::{ + concurrent_processor::ProcessMap, error::Error, formats::InputProcessableFormat, repo::{ @@ -161,9 +162,18 @@ pub(crate) async fn process_cleanup(repo: R, store: S, wo pub(crate) async fn process_images( repo: R, store: S, + process_map: ProcessMap, worker_id: String, ) { - process_jobs(&repo, &store, worker_id, PROCESS_QUEUE, process::perform).await + process_image_jobs( + &repo, + &store, + &process_map, + worker_id, + PROCESS_QUEUE, + process::perform, + ) + .await } type LocalBoxFuture<'a, T> = Pin + 'a>>; @@ -216,3 +226,57 @@ where .await?; } } + +async fn process_image_jobs( + repo: &R, + store: &S, + process_map: &ProcessMap, + worker_id: String, + queue: &'static str, + callback: F, +) where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, + for<'a> F: + Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, +{ + loop { + let res = + image_job_loop(repo, store, process_map, worker_id.clone(), queue, callback).await; + + if let Err(e) = res { + tracing::warn!("Error processing jobs: {}", format!("{e}")); + tracing::warn!("{}", format!("{e:?}")); + continue; + } + + break; + } +} + +async fn image_job_loop( + repo: &R, + store: &S, + process_map: &ProcessMap, + worker_id: String, + queue: &'static str, + callback: F, +) -> Result<(), Error> +where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, + for<'a> F: + Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, +{ + loop { + let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; + + let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + + span.in_scope(|| (callback)(repo, store, process_map, bytes.as_ref())) + .instrument(span) + .await?; + } +} diff --git a/src/queue/process.rs b/src/queue/process.rs index 60c729c..fa43d68 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,4 +1,5 @@ use crate::{ + concurrent_processor::ProcessMap, error::{Error, UploadError}, formats::InputProcessableFormat, ingest::Session, @@ -14,6 +15,7 @@ use std::path::PathBuf; pub(super) fn perform<'a, R, S>( repo: &'a R, store: &'a S, + process_map: &'a ProcessMap, job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where @@ -47,6 +49,7 @@ where generate( repo, store, + process_map, target_format, Serde::into_inner(source), process_path, @@ -126,10 +129,12 @@ where Ok(()) } +#[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] async fn generate( repo: &R, store: &S, + process_map: &ProcessMap, target_format: InputProcessableFormat, source: Alias, process_path: PathBuf, @@ -155,6 +160,7 @@ async fn generate( crate::generate::generate( repo, store, + process_map, target_format, source, process_path,