From fe1132aec1614d99971aedf98d44ed6818d4edcd Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 24 Jan 2024 17:14:31 -0600 Subject: [PATCH] Enable at-most-once queueing for some cleanup jobs --- src/queue.rs | 48 +++++++++++---- src/repo.rs | 16 ++++- src/repo/postgres.rs | 27 +++++++-- .../migrations/V0011__add_unique_name_jobs.rs | 15 +++++ src/repo/postgres/schema.rs | 1 + src/repo/sled.rs | 60 ++++++++++++++----- 6 files changed, 132 insertions(+), 35 deletions(-) create mode 100644 src/repo/postgres/migrations/V0011__add_unique_name_jobs.rs diff --git a/src/queue.rs b/src/queue.rs index b4ed9e6..f7bd1a5 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -22,6 +22,10 @@ mod process; const CLEANUP_QUEUE: &str = "cleanup"; const PROCESS_QUEUE: &str = "process"; +const OUTDATED_PROXIES_UNIQUE_KEY: &str = "outdated-proxies"; +const OUTDATED_VARIANTS_UNIQUE_KEY: &str = "outdated-variants"; +const ALL_VARIANTS_UNIQUE_KEY: &str = "all-variants"; +const PRUNE_MISSING_UNIQUE_KEY: &str = "prune-missing"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Cleanup { @@ -71,13 +75,13 @@ pub(crate) async fn cleanup_alias( token: Serde::new(token), }) .map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + repo.push(CLEANUP_QUEUE, job, None).await?; Ok(()) } pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + repo.push(CLEANUP_QUEUE, job, None).await?; Ok(()) } @@ -86,7 +90,7 @@ pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc) -> identifier: identifier.to_string(), }) .map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + repo.push(CLEANUP_QUEUE, job, None).await?; Ok(()) } @@ -97,31 +101,55 @@ async fn cleanup_variants( ) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + repo.push(CLEANUP_QUEUE, job, None).await?; Ok(()) } pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + if repo + .push(CLEANUP_QUEUE, job, Some(OUTDATED_PROXIES_UNIQUE_KEY)) + .await? + .is_none() + { + tracing::debug!("outdated proxies conflict"); + } Ok(()) } pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + if repo + .push(CLEANUP_QUEUE, job, Some(OUTDATED_VARIANTS_UNIQUE_KEY)) + .await? + .is_none() + { + tracing::debug!("outdated variants conflict"); + } Ok(()) } pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + if repo + .push(CLEANUP_QUEUE, job, Some(ALL_VARIANTS_UNIQUE_KEY)) + .await? + .is_none() + { + tracing::debug!("all variants conflict"); + } Ok(()) } pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> { let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job).await?; + if repo + .push(CLEANUP_QUEUE, job, Some(PRUNE_MISSING_UNIQUE_KEY)) + .await? + .is_none() + { + tracing::debug!("prune missing conflict"); + } Ok(()) } @@ -137,7 +165,7 @@ pub(crate) async fn queue_ingest( upload_id: Serde::new(upload_id), }) .map_err(UploadError::PushJob)?; - repo.push(PROCESS_QUEUE, job).await?; + repo.push(PROCESS_QUEUE, job, None).await?; Ok(()) } @@ -155,7 +183,7 @@ pub(crate) async fn queue_generate( process_args, }) .map_err(UploadError::PushJob)?; - repo.push(PROCESS_QUEUE, job).await?; + repo.push(PROCESS_QUEUE, job, None).await?; Ok(()) } diff --git a/src/repo.rs b/src/repo.rs index acff51b..6051a5b 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -353,7 +353,12 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result; + async fn push( + &self, + queue: &'static str, + job: serde_json::Value, + unique_key: Option<&'static str>, + ) -> Result, RepoError>; async fn pop( &self, @@ -381,8 +386,13 @@ impl QueueRepo for Arc where T: QueueRepo, { - async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result { - T::push(self, queue, job).await + async fn push( + &self, + queue: &'static str, + job: serde_json::Value, + unique_key: Option<&'static str>, + ) -> Result, RepoError> { + T::push(self, queue, job, unique_key).await } async fn pop( diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 4a76784..12a9ecb 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1223,26 +1223,41 @@ impl QueueRepo for PostgresRepo { &self, queue_name: &'static str, job_json: serde_json::Value, - ) -> Result { + in_unique_key: Option<&'static str>, + ) -> Result, RepoError> { let guard = PushMetricsGuard::guard(queue_name); use schema::job_queue::dsl::*; let mut conn = self.get_connection().await?; - let job_id = diesel::insert_into(job_queue) - .values((queue.eq(queue_name), job.eq(job_json))) + let res = diesel::insert_into(job_queue) + .values(( + queue.eq(queue_name), + job.eq(job_json), + unique_key.eq(in_unique_key), + )) .returning(id) .get_result::(&mut conn) .with_metrics("pict-rs.postgres.queue.push") .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map(JobId) + .map(Some); - guard.disarm(); + match res { + Ok(job_id) => { + guard.disarm(); - Ok(JobId(job_id)) + Ok(job_id) + } + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(None), + Err(e) => Err(RepoError::from(PostgresError::Diesel(e))), + } } #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/postgres/migrations/V0011__add_unique_name_jobs.rs b/src/repo/postgres/migrations/V0011__add_unique_name_jobs.rs new file mode 100644 index 0000000..713a6b2 --- /dev/null +++ b/src/repo/postgres/migrations/V0011__add_unique_name_jobs.rs @@ -0,0 +1,15 @@ +use barrel::backend::Pg; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.change_table("job_queue", |t| { + t.add_column( + "unique_key", + types::text().size(50).nullable(true).unique(true), + ); + }); + + m.make::().to_string() +} diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index 72ba9aa..78f8951 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -42,6 +42,7 @@ diesel::table! { status -> JobStatus, queue_time -> Timestamp, heartbeat -> Nullable, + unique_key -> Nullable, } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index d89018f..5d4c4eb 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -95,6 +95,8 @@ pub(crate) struct SledRepo { alias_hashes: Tree, alias_delete_tokens: Tree, queue: Tree, + unique_jobs: Tree, + unique_jobs_inverse: Tree, job_state: Tree, alias_access: Tree, inverse_alias_access: Tree, @@ -134,6 +136,8 @@ impl SledRepo { alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, queue: db.open_tree("pict-rs-queue-tree")?, + unique_jobs: db.open_tree("pict-rs-unique-jobs-tree")?, + unique_jobs_inverse: db.open_tree("pict-rs-unique-jobs-inverse-tree")?, job_state: db.open_tree("pict-rs-job-state-tree")?, alias_access: db.open_tree("pict-rs-alias-access-tree")?, inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, @@ -625,7 +629,8 @@ impl QueueRepo for SledRepo { &self, queue_name: &'static str, job: serde_json::Value, - ) -> Result { + unique_key: Option<&'static str>, + ) -> Result, RepoError> { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); @@ -633,29 +638,45 @@ impl QueueRepo for SledRepo { let job = serde_json::to_vec(&job).map_err(SledError::Job)?; let queue = self.queue.clone(); + let unique_jobs = self.unique_jobs.clone(); + let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let job_state = self.job_state.clone(); let res = crate::sync::spawn_blocking("sled-io", move || { - (&queue, &job_state).transaction(|(queue, job_state)| { - let state = JobState::pending(); + (&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( + |(queue, unique_jobs, unique_jobs_inverse, job_state)| { + let state = JobState::pending(); - queue.insert(&key[..], &job[..])?; - job_state.insert(&key[..], state.as_bytes())?; + queue.insert(&key[..], &job[..])?; + if let Some(unique_key) = unique_key { + if unique_jobs + .insert(unique_key.as_bytes(), &key[..])? + .is_some() + { + return sled::transaction::abort(()); + } - Ok(()) - }) + unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?; + } + job_state.insert(&key[..], state.as_bytes())?; + + Ok(()) + }, + ) }) .await .map_err(|_| RepoError::Canceled)?; - if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { - return Err(RepoError::from(SledError::from(e))); + match res { + Err(TransactionError::Abort(())) => return Ok(None), + Err(TransactionError::Storage(e)) => return Err(RepoError::from(SledError::from(e))), + Ok(()) => (), } if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { notifier.notify_one(); metrics_guard.disarm(); - return Ok(id); + return Ok(Some(id)); } self.queue_notifier @@ -667,7 +688,7 @@ impl QueueRepo for SledRepo { metrics_guard.disarm(); - Ok(id) + Ok(Some(id)) } #[tracing::instrument(skip(self, worker_id), fields(job_id))] @@ -808,14 +829,21 @@ impl QueueRepo for SledRepo { let key = job_key(queue_name, job_id); let queue = self.queue.clone(); + let unique_jobs = self.unique_jobs.clone(); + let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let job_state = self.job_state.clone(); let res = crate::sync::spawn_blocking("sled-io", move || { - (&queue, &job_state).transaction(|(queue, job_state)| { - queue.remove(&key[..])?; - job_state.remove(&key[..])?; - Ok(()) - }) + (&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( + |(queue, unique_jobs, unique_jobs_inverse, job_state)| { + queue.remove(&key[..])?; + if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? { + unique_jobs.remove(unique_key)?; + } + job_state.remove(&key[..])?; + Ok(()) + }, + ) }) .await .map_err(|_| RepoError::Canceled)?;