From 74885f2932d88bd6ce5ab8f2e77943e55b354e52 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 31 Mar 2024 16:00:23 -0500 Subject: [PATCH] Share notification map between sled, postgres --- src/generate.rs | 51 +++++----- src/lib.rs | 50 ++++------ src/processor.rs | 4 +- src/queue.rs | 7 +- src/queue/process.rs | 11 +-- src/repo.rs | 27 ++---- src/repo/notification_map.rs | 94 ++++++++++++++++++ src/repo/postgres.rs | 180 +++++++---------------------------- src/repo/sled.rs | 72 +++++++++++--- 9 files changed, 245 insertions(+), 251 deletions(-) create mode 100644 src/repo/notification_map.rs diff --git a/src/generate.rs b/src/generate.rs index 91feba1..2fb3cda 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -13,7 +13,6 @@ use crate::{ use std::{ future::Future, - path::PathBuf, sync::Arc, time::{Duration, Instant}, }; @@ -52,7 +51,7 @@ impl Drop for MetricsGuard { pub(crate) async fn generate( state: &State, format: InputProcessableFormat, - thumbnail_path: PathBuf, + variant: String, thumbnail_args: Vec, original_details: &Details, hash: Hash, @@ -66,12 +65,10 @@ pub(crate) async fn generate( Ok((original_details.clone(), identifier)) } else { - let variant = thumbnail_path.to_string_lossy().to_string(); - let mut attempts = 0; let tup = loop { - if attempts > 4 { - todo!("return error"); + if attempts > 2 { + return Err(UploadError::ProcessTimeout.into()); } match state @@ -95,35 +92,35 @@ pub(crate) async fn generate( .with_poll_timer("heartbeat-future") .await; + state + .repo + .notify_variant(hash.clone(), variant.clone()) + .await?; + match res { Ok(Ok(tuple)) => break tuple, - Ok(Err(e)) | Err(e) => { - state - .repo - .fail_variant(hash.clone(), variant.clone()) - .await?; - - return Err(e); - } + Ok(Err(e)) | Err(e) => return Err(e), } } - Err(_) => { - match state + Err(mut entry) => { + let notified = entry.notified_timeout(Duration::from_secs(20)); + + if let Some(identifier) = state .repo - .await_variant(hash.clone(), variant.clone()) + .variant_identifier(hash.clone(), variant.clone()) .await? { - Some(identifier) => { - let details = - crate::ensure_details_identifier(state, &identifier).await?; - - break (details, identifier); - } - None => { - attempts += 1; - continue; - } + drop(notified); + let details = crate::ensure_details_identifier(state, &identifier).await?; + break (details, identifier); } + + match notified.await { + Ok(()) => tracing::debug!("notified"), + Err(_) => tracing::warn!("timeout"), + } + + attempts += 1; } } }; diff --git a/src/lib.rs b/src/lib.rs index fd89448..4cf13bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,6 @@ use state::State; use std::{ marker::PhantomData, path::Path, - path::PathBuf, rc::Rc, sync::{Arc, OnceLock}, time::{Duration, SystemTime}, @@ -774,7 +773,7 @@ fn prepare_process( config: &Configuration, operations: Vec<(String, String)>, ext: &str, -) -> Result<(InputProcessableFormat, PathBuf, Vec), Error> { +) -> Result<(InputProcessableFormat, String, Vec), Error> { let operations = operations .into_iter() .filter(|(k, _)| config.media.filters.contains(&k.to_lowercase())) @@ -784,10 +783,9 @@ fn prepare_process( .parse::() .map_err(|_| UploadError::UnsupportedProcessExtension)?; - let (thumbnail_path, thumbnail_args) = - self::processor::build_chain(&operations, &format.to_string())?; + let (variant, thumbnail_args) = self::processor::build_chain(&operations, &format.to_string())?; - Ok((format, thumbnail_path, thumbnail_args)) + Ok((format, variant, thumbnail_args)) } #[tracing::instrument(name = "Fetching derived details", skip(state))] @@ -798,7 +796,7 @@ async fn process_details( ) -> Result { let alias = alias_from_query(source.into(), &state).await?; - let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?; + let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?; let hash = state .repo @@ -806,18 +804,16 @@ async fn process_details( .await? .ok_or(UploadError::MissingAlias)?; - let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); - if !state.config.server.read_only { state .repo - .accessed_variant(hash.clone(), thumbnail_string.clone()) + .accessed_variant(hash.clone(), variant.clone()) .await?; } let identifier = state .repo - .variant_identifier(hash, thumbnail_string) + .variant_identifier(hash, variant) .await? .ok_or(UploadError::MissingAlias)?; @@ -856,11 +852,9 @@ async fn process( ) -> Result { let alias = proxy_alias_from_query(source.into(), &state).await?; - let (format, thumbnail_path, thumbnail_args) = + let (format, variant, thumbnail_args) = prepare_process(&state.config, operations, ext.as_str())?; - let path_string = thumbnail_path.to_string_lossy().to_string(); - let (hash, alias, not_found) = if let Some(hash) = state.repo.hash(&alias).await? { (hash, alias, false) } else { @@ -874,13 +868,13 @@ async fn process( if !state.config.server.read_only { state .repo - .accessed_variant(hash.clone(), path_string.clone()) + .accessed_variant(hash.clone(), variant.clone()) .await?; } let identifier_opt = state .repo - .variant_identifier(hash.clone(), path_string) + .variant_identifier(hash.clone(), variant.clone()) .await?; let (details, identifier) = if let Some(identifier) = identifier_opt { @@ -897,7 +891,7 @@ async fn process( generate::generate( &state, format, - thumbnail_path, + variant, thumbnail_args, &original_details, hash, @@ -933,9 +927,8 @@ async fn process_head( } }; - let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?; + let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?; - let path_string = thumbnail_path.to_string_lossy().to_string(); let Some(hash) = state.repo.hash(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().finish()); @@ -944,14 +937,11 @@ async fn process_head( if !state.config.server.read_only { state .repo - .accessed_variant(hash.clone(), path_string.clone()) + .accessed_variant(hash.clone(), variant.clone()) .await?; } - let identifier_opt = state - .repo - .variant_identifier(hash.clone(), path_string) - .await?; + let identifier_opt = state.repo.variant_identifier(hash.clone(), variant).await?; if let Some(identifier) = identifier_opt { let details = ensure_details_identifier(&state, &identifier).await?; @@ -987,10 +977,9 @@ async fn process_backgrounded( } }; - let (target_format, process_path, process_args) = + let (target_format, variant, process_args) = prepare_process(&state.config, operations, ext.as_str())?; - let path_string = process_path.to_string_lossy().to_string(); let Some(hash) = state.repo.hash(&source).await? else { // Invalid alias return Ok(HttpResponse::BadRequest().finish()); @@ -998,7 +987,7 @@ async fn process_backgrounded( let identifier_opt = state .repo - .variant_identifier(hash.clone(), path_string) + .variant_identifier(hash.clone(), variant.clone()) .await?; if identifier_opt.is_some() { @@ -1009,14 +998,7 @@ async fn process_backgrounded( return Err(UploadError::ReadOnly.into()); } - queue_generate( - &state.repo, - target_format, - source, - process_path, - process_args, - ) - .await?; + queue_generate(&state.repo, target_format, source, variant, process_args).await?; Ok(HttpResponse::Accepted().finish()) } diff --git a/src/processor.rs b/src/processor.rs index dec45e8..d7e3121 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -91,7 +91,7 @@ impl ResizeKind { pub(crate) fn build_chain( args: &[(String, String)], ext: &str, -) -> Result<(PathBuf, Vec), Error> { +) -> Result<(String, Vec), Error> { fn parse(key: &str, value: &str) -> Result, Error> { if key == P::NAME { return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?)); @@ -122,7 +122,7 @@ pub(crate) fn build_chain( path.push(ext); - Ok((path, args)) + Ok((path.to_string_lossy().to_string(), args)) } impl Processor for Identity { diff --git a/src/queue.rs b/src/queue.rs index e36e6c9..f20b457 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -11,7 +11,6 @@ use crate::{ use std::{ ops::Deref, - path::PathBuf, sync::Arc, time::{Duration, Instant}, }; @@ -62,7 +61,7 @@ enum Process { Generate { target_format: InputProcessableFormat, source: Serde, - process_path: PathBuf, + process_path: String, process_args: Vec, }, } @@ -177,13 +176,13 @@ pub(crate) async fn queue_generate( repo: &ArcRepo, target_format: InputProcessableFormat, source: Alias, - process_path: PathBuf, + variant: String, process_args: Vec, ) -> Result<(), Error> { let job = serde_json::to_value(Process::Generate { target_format, source: Serde::new(source), - process_path, + process_path: variant, process_args, }) .map_err(UploadError::PushJob)?; diff --git a/src/queue/process.rs b/src/queue/process.rs index 2c62758..7c74a7a 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -13,7 +13,7 @@ use crate::{ store::Store, UploadQuery, }; -use std::{path::PathBuf, sync::Arc}; +use std::sync::Arc; use super::{JobContext, JobFuture, JobResult}; @@ -172,12 +172,12 @@ where Ok(()) } -#[tracing::instrument(skip(state, process_path, process_args))] +#[tracing::instrument(skip(state, variant, process_args))] async fn generate( state: &State, target_format: InputProcessableFormat, source: Alias, - process_path: PathBuf, + variant: String, process_args: Vec, ) -> JobResult { let hash = state @@ -188,10 +188,9 @@ async fn generate( .ok_or(UploadError::MissingAlias) .abort()?; - let path_string = process_path.to_string_lossy().to_string(); let identifier_opt = state .repo - .variant_identifier(hash.clone(), path_string) + .variant_identifier(hash.clone(), variant.clone()) .await .retry()?; @@ -205,7 +204,7 @@ async fn generate( crate::generate::generate( state, target_format, - process_path, + variant, process_args, &original_details, hash, diff --git a/src/repo.rs b/src/repo.rs index 81af5ec..44905a3 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -3,6 +3,7 @@ mod delete_token; mod hash; mod metrics; mod migrate; +mod notification_map; use crate::{ config, @@ -24,6 +25,8 @@ pub(crate) use delete_token::DeleteToken; pub(crate) use hash::Hash; pub(crate) use migrate::{migrate_04, migrate_repo}; +use self::notification_map::NotificationEntry; + pub(crate) type ArcRepo = Arc; #[derive(Clone, Debug)] @@ -744,17 +747,11 @@ pub(crate) trait VariantRepo: BaseRepo { &self, hash: Hash, variant: String, - ) -> Result, RepoError>; + ) -> Result, RepoError>; async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - - async fn await_variant( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError>; + async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn relate_variant_identifier( &self, @@ -783,7 +780,7 @@ where &self, hash: Hash, variant: String, - ) -> Result, RepoError> { + ) -> Result, RepoError> { T::claim_variant_processing_rights(self, hash, variant).await } @@ -791,16 +788,8 @@ where T::variant_heartbeat(self, hash, variant).await } - async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - T::fail_variant(self, hash, variant).await - } - - async fn await_variant( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError> { - T::await_variant(self, hash, variant).await + async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + T::notify_variant(self, hash, variant).await } async fn relate_variant_identifier( diff --git a/src/repo/notification_map.rs b/src/repo/notification_map.rs new file mode 100644 index 0000000..5e2b0d0 --- /dev/null +++ b/src/repo/notification_map.rs @@ -0,0 +1,94 @@ +use dashmap::DashMap; +use std::{ + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Weak, + }, + time::Duration, +}; +use tokio::sync::Notify; + +use crate::future::WithTimeout; + +type Map = Arc, Weak>>; + +#[derive(Clone)] +pub(super) struct NotificationMap { + map: Map, +} + +pub(crate) struct NotificationEntry { + inner: Arc, +} + +struct NotificationEntryInner { + key: Arc, + map: Map, + notify: Notify, + armed: AtomicBool, +} + +impl NotificationMap { + pub(super) fn new() -> Self { + Self { + map: Arc::new(DashMap::new()), + } + } + + pub(super) fn register_interest(&self, key: Arc) -> NotificationEntry { + let new_entry = Arc::new(NotificationEntryInner { + key: key.clone(), + map: self.map.clone(), + notify: crate::sync::bare_notify(), + armed: AtomicBool::new(false), + }); + + let mut key_entry = self + .map + .entry(key) + .or_insert_with(|| Arc::downgrade(&new_entry)); + + let upgraded_entry = key_entry.value().upgrade(); + + let inner = if let Some(entry) = upgraded_entry { + entry + } else { + *key_entry.value_mut() = Arc::downgrade(&new_entry); + new_entry + }; + + inner.armed.store(true, Ordering::Release); + + NotificationEntry { inner } + } + + pub(super) fn notify(&self, key: &str) { + if let Some(notifier) = self.map.get(key).and_then(|v| v.upgrade()) { + notifier.notify.notify_waiters(); + } + } +} + +impl NotificationEntry { + pub(crate) fn notified_timeout( + &mut self, + duration: Duration, + ) -> impl Future> + '_ { + self.inner.notify.notified().with_timeout(duration) + } +} + +impl Default for NotificationMap { + fn default() -> Self { + Self::new() + } +} + +impl Drop for NotificationEntryInner { + fn drop(&mut self) { + if self.armed.load(Ordering::Acquire) { + self.map.remove(&self.key); + } + } +} diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 466d2bf..7db0a58 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -21,7 +21,6 @@ use diesel_async::{ bb8::{Pool, PooledConnection, RunError}, AsyncDieselConnectionManager, ManagerConfig, PoolError, }, - scoped_futures::ScopedFutureExt, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; use futures_core::Stream; @@ -45,6 +44,7 @@ use self::job_status::JobStatus; use super::{ metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, + notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, @@ -64,32 +64,7 @@ struct Inner { notifier_pool: Pool, queue_notifications: DashMap>, upload_notifications: DashMap>, - keyed_notifications: DashMap, Weak>, -} - -struct NotificationEntry { - key: Arc, - inner: Arc, - notify: Notify, -} - -impl Drop for NotificationEntry { - fn drop(&mut self) { - self.inner.keyed_notifications.remove(self.key.as_ref()); - } -} - -struct KeyListener { - entry: Arc, -} - -impl KeyListener { - fn notified_timeout( - &self, - timeout: Duration, - ) -> impl Future> + '_ { - self.entry.notify.notified().with_timeout(timeout) - } + keyed_notifications: NotificationMap, } struct UploadInterest { @@ -363,7 +338,7 @@ impl PostgresRepo { notifier_pool, queue_notifications: DashMap::new(), upload_notifications: DashMap::new(), - keyed_notifications: DashMap::new(), + keyed_notifications: NotificationMap::new(), }); let handle = crate::sync::abort_on_drop(crate::sync::spawn_sendable( @@ -451,29 +426,8 @@ impl PostgresRepo { Ok(()) } - fn listen_on_key(&self, key: Arc) -> KeyListener { - let new_entry = Arc::new(NotificationEntry { - key: key.clone(), - inner: Arc::clone(&self.inner), - notify: crate::sync::bare_notify(), - }); - - let mut entry = self - .inner - .keyed_notifications - .entry(key) - .or_insert_with(|| Arc::downgrade(&new_entry)); - - let upgraded = entry.value().upgrade(); - - let entry = if let Some(existing_entry) = upgraded { - existing_entry - } else { - *entry.value_mut() = Arc::downgrade(&new_entry); - new_entry - }; - - KeyListener { entry } + fn listen_on_key(&self, key: Arc) -> NotificationEntry { + self.inner.keyed_notifications.register_interest(key) } async fn register_interest(&self) -> Result<(), PostgresError> { @@ -658,14 +612,7 @@ impl<'a> UploadNotifierState<'a> { impl<'a> KeyedNotifierState<'a> { fn handle(&self, key: &str) { - if let Some(notification_entry) = self - .inner - .keyed_notifications - .get(key) - .and_then(|weak| weak.upgrade()) - { - notification_entry.notify.notify_waiters(); - } + self.inner.keyed_notifications.notify(key); } } @@ -1150,20 +1097,23 @@ impl VariantRepo for PostgresRepo { &self, hash: Hash, variant: String, - ) -> Result, RepoError> { + ) -> Result, RepoError> { + let key = Arc::from(format!("{}{variant}", hash.to_base64())); + let entry = self.listen_on_key(Arc::clone(&key)); + + self.register_interest().await?; + if self .variant_identifier(hash.clone(), variant.clone()) .await? .is_some() { - return Ok(Err(VariantAlreadyExists)); + return Ok(Err(entry)); } - let key = format!("{}{variant}", hash.to_base64()); - match self.insert_keyed_notifier(&key).await? { Ok(()) => Ok(Ok(())), - Err(AlreadyInserted) => Ok(Err(VariantAlreadyExists)), + Err(AlreadyInserted) => Ok(Err(entry)), } } @@ -1177,40 +1127,12 @@ impl VariantRepo for PostgresRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let key = format!("{}{variant}", hash.to_base64()); self.clear_keyed_notifier(key).await.map_err(Into::into) } - #[tracing::instrument(level = "debug", skip(self))] - async fn await_variant( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError> { - let key = Arc::from(format!("{}{variant}", hash.to_base64())); - - let listener = self.listen_on_key(key); - let notified = listener.notified_timeout(Duration::from_secs(5)); - - self.register_interest().await?; - - if let Some(identifier) = self - .variant_identifier(hash.clone(), variant.clone()) - .await? - { - return Ok(Some(identifier)); - } - - match notified.await { - Ok(()) => tracing::debug!("notified"), - Err(_) => tracing::trace!("timeout"), - } - - self.variant_identifier(hash, variant).await - } - #[tracing::instrument(level = "debug", skip(self))] async fn relate_variant_identifier( &self, @@ -1218,60 +1140,30 @@ impl VariantRepo for PostgresRepo { input_variant: String, input_identifier: &Arc, ) -> Result, RepoError> { + use schema::variants::dsl::*; + let mut conn = self.get_connection().await?; - conn.transaction(|conn| { - async move { - let res = async { - use schema::variants::dsl::*; + let res = diesel::insert_into(variants) + .values(( + hash.eq(&input_hash), + variant.eq(&input_variant), + identifier.eq(input_identifier.to_string()), + )) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)?; - diesel::insert_into(variants) - .values(( - hash.eq(&input_hash), - variant.eq(&input_variant), - identifier.eq(input_identifier.to_string()), - )) - .execute(conn) - .with_metrics( - crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER, - ) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout) - } - .await; - - let notification_res = async { - use schema::keyed_notifications::dsl::*; - - let input_key = format!("{}{input_variant}", input_hash.to_base64()); - diesel::delete(keyed_notifications) - .filter(key.eq(input_key)) - .execute(conn) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout) - } - .await; - - match notification_res? { - Ok(_) => {} - Err(e) => tracing::warn!("Failed to clear notifier: {e}"), - } - - match res? { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(VariantAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e)), - } - } - .scope_boxed() - }) - .await - .map_err(PostgresError::into) + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(VariantAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 659d23d..ac3730f 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -5,6 +5,7 @@ use crate::{ serde_str::Serde, stream::{from_iterator, LocalBoxStream}, }; +use dashmap::DashMap; use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; use std::{ collections::HashMap, @@ -22,6 +23,7 @@ use uuid::Uuid; use super::{ hash::Hash, metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, + notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, @@ -113,6 +115,8 @@ pub(crate) struct SledRepo { migration_identifiers: Tree, cache_capacity: u64, export_path: PathBuf, + variant_process_map: DashMap<(Hash, String), time::OffsetDateTime>, + notifications: NotificationMap, db: Db, } @@ -156,6 +160,8 @@ impl SledRepo { migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, cache_capacity, export_path, + variant_process_map: DashMap::new(), + notifications: NotificationMap::new(), db, }) } @@ -1453,27 +1459,61 @@ impl VariantRepo for SledRepo { &self, hash: Hash, variant: String, - ) -> Result, RepoError> { - todo!() + ) -> Result, RepoError> { + let key = (hash.clone(), variant.clone()); + let now = time::OffsetDateTime::now_utc(); + let entry = self + .notifications + .register_interest(Arc::from(format!("{}{variant}", hash.to_base64()))); + + match self.variant_process_map.entry(key.clone()) { + dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => { + if occupied_entry + .get() + .saturating_add(time::Duration::minutes(2)) + > now + { + return Ok(Err(entry)); + } + + occupied_entry.insert(now); + } + dashmap::mapref::entry::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(now); + } + } + + if self.variant_identifier(hash, variant).await?.is_some() { + self.variant_process_map.remove(&key); + return Ok(Err(entry)); + } + + Ok(Ok(())) } #[tracing::instrument(level = "trace", skip(self))] async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - todo!() + let key = (hash, variant); + let now = time::OffsetDateTime::now_utc(); + + if let dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) = + self.variant_process_map.entry(key) + { + occupied_entry.insert(now); + } + + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - todo!() - } + async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let key = (hash.clone(), variant.clone()); + self.variant_process_map.remove(&key); - #[tracing::instrument(level = "trace", skip(self))] - async fn await_variant( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError> { - todo!() + let key = format!("{}{variant}", hash.to_base64()); + self.notifications.notify(&key); + + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] @@ -1490,7 +1530,7 @@ impl VariantRepo for SledRepo { let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - crate::sync::spawn_blocking("sled-io", move || { + let out = crate::sync::spawn_blocking("sled-io", move || { hash_variant_identifiers .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) .map(|res| res.map_err(|_| VariantAlreadyExists)) @@ -1498,7 +1538,9 @@ impl VariantRepo for SledRepo { .await .map_err(|_| RepoError::Canceled)? .map_err(SledError::from) - .map_err(RepoError::from) + .map_err(RepoError::from)?; + + Ok(out) } #[tracing::instrument(level = "trace", skip(self))]