use crate::{ details::Details, error::{Error, UploadError}, repo::Hash, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; use flume::{r#async::RecvFut, Receiver, Sender}; use std::{ future::Future, path::PathBuf, pin::Pin, sync::Arc, task::{Context, Poll}, }; use tracing::Span; type OutcomeReceiver = Receiver<(Details, web::Bytes)>; type ProcessMapKey = (Hash, PathBuf); type ProcessMapInner = DashMap; #[derive(Debug, Default, Clone)] pub(crate) struct ProcessMap { process_map: Arc, } impl ProcessMap { pub(super) fn new() -> Self { Self::default() } pub(super) async fn process( &self, hash: Hash, path: PathBuf, fut: Fut, ) -> Result<(Details, web::Bytes), Error> where Fut: Future>, { let key = (hash.clone(), path.clone()); let (sender, receiver) = flume::bounded(1); let entry = self.process_map.entry(key.clone()); let (state, span) = match entry { Entry::Vacant(vacant) => { vacant.insert(receiver); let span = tracing::info_span!( "Processing image", hash = ?hash, path = ?path, completed = &tracing::field::Empty, ); metrics::counter!("pict-rs.process-map.inserted").increment(1); (CancelState::Sender { sender }, span) } Entry::Occupied(receiver) => { let span = tracing::info_span!( "Waiting for processed image", hash = ?hash, path = ?path, ); let receiver = receiver.get().clone().into_recv_async(); (CancelState::Receiver { receiver }, span) } }; CancelSafeProcessor { cancel_token: CancelToken { span, key, state, process_map: self.clone(), }, fut, } .await } fn remove(&self, key: &ProcessMapKey) -> Option { self.process_map.remove(key).map(|(_, v)| v) } } struct CancelToken { span: Span, key: ProcessMapKey, state: CancelState, process_map: ProcessMap, } enum CancelState { Sender { sender: Sender<(Details, web::Bytes)>, }, Receiver { receiver: RecvFut<'static, (Details, web::Bytes)>, }, } impl CancelState { const fn is_sender(&self) -> bool { matches!(self, Self::Sender { .. }) } } pin_project_lite::pin_project! { struct CancelSafeProcessor { cancel_token: CancelToken, #[pin] fut: F, } } impl Future for CancelSafeProcessor where F: Future>, { type Output = Result<(Details, web::Bytes), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); let span = &this.cancel_token.span; let process_map = &this.cancel_token.process_map; let state = &mut this.cancel_token.state; let key = &this.cancel_token.key; let fut = this.fut; span.in_scope(|| match state { CancelState::Sender { sender } => { let res = std::task::ready!(fut.poll(cx)); if process_map.remove(key).is_some() { metrics::counter!("pict-rs.process-map.removed").increment(1); } if let Ok(tup) = &res { let _ = sender.try_send(tup.clone()); } Poll::Ready(res) } CancelState::Receiver { ref mut receiver } => Pin::new(receiver) .poll(cx) .map(|res| res.map_err(|_| UploadError::Canceled.into())), }) } } impl Drop for CancelToken { fn drop(&mut self) { if self.state.is_sender() { let completed = self.process_map.remove(&self.key).is_none(); self.span.record("completed", completed); if !completed { metrics::counter!("pict-rs.process-map.removed").increment(1); } } } }