From 37448722de996c245ce52bf63b887041e5c4318f Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Sep 2023 16:19:46 -0500 Subject: [PATCH] postgres: Rework job & upload notifications (more) postgres: Add metrics to job push & pop, upload wait sled: add upload wait metrics --- src/details.rs | 1 + src/discover/exiftool.rs | 1 + src/discover/magick.rs | 2 + src/repo.rs | 10 +- src/repo/metrics.rs | 74 ++++++ src/repo/postgres.rs | 225 +++++++++++++++--- .../migrations/V0006__create_queue.rs | 2 +- src/repo/sled.rs | 63 +---- 8 files changed, 283 insertions(+), 95 deletions(-) create mode 100644 src/repo/metrics.rs diff --git a/src/details.rs b/src/details.rs index d632e32..f5b9a86 100644 --- a/src/details.rs +++ b/src/details.rs @@ -103,6 +103,7 @@ impl Details { )) } + #[tracing::instrument(level = "DEBUG")] pub(crate) async fn from_store( store: &S, identifier: &Arc, diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 1530eac..672ce10 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -9,6 +9,7 @@ use crate::{ use super::Discovery; +#[tracing::instrument(level = "DEBUG", skip_all)] pub(super) async fn check_reorient( Discovery { input, diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 554c812..48c33fd 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -97,6 +97,7 @@ pub(super) async fn confirm_bytes( .await } +#[tracing::instrument(level = "DEBUG", skip(f))] async fn count_avif_frames(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, @@ -147,6 +148,7 @@ where Ok(lines) } +#[tracing::instrument(level = "DEBUG", skip(f))] async fn discover_file(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, diff --git a/src/repo.rs b/src/repo.rs index 0c8c312..dd32c19 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,3 +1,9 @@ +mod alias; +mod delete_token; +mod hash; +mod metrics; +mod migrate; + use crate::{ config, details::Details, @@ -9,10 +15,6 @@ use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; -mod alias; -mod delete_token; -mod hash; -mod migrate; pub(crate) mod postgres; pub(crate) mod sled; diff --git a/src/repo/metrics.rs b/src/repo/metrics.rs new file mode 100644 index 0000000..3c07efa --- /dev/null +++ b/src/repo/metrics.rs @@ -0,0 +1,74 @@ +use std::time::Instant; + +pub(super) struct PushMetricsGuard { + queue: &'static str, + armed: bool, +} + +pub(super) struct PopMetricsGuard { + queue: &'static str, + start: Instant, + armed: bool, +} + +pub(super) struct WaitMetricsGuard { + start: Instant, + armed: bool, +} + +impl PushMetricsGuard { + pub(super) fn guard(queue: &'static str) -> Self { + Self { queue, armed: true } + } + + pub(super) fn disarm(mut self) { + self.armed = false; + } +} + +impl PopMetricsGuard { + pub(super) fn guard(queue: &'static str) -> Self { + Self { + queue, + start: Instant::now(), + armed: true, + } + } + + pub(super) fn disarm(mut self) { + self.armed = false; + } +} + +impl WaitMetricsGuard { + pub(super) fn guard() -> Self { + Self { + start: Instant::now(), + armed: true, + } + } + + pub(super) fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for PushMetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + +impl Drop for PopMetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue); + metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + +impl Drop for WaitMetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.upload.wait.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); + metrics::increment_counter!("pict-rs.upload.wait", "completed" => (!self.armed).to_string()); + } +} diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index cf36b9c..659eb6a 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -3,11 +3,12 @@ mod job_status; mod schema; use std::{ + collections::{BTreeSet, VecDeque}, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, Weak, }, - time::Duration, + time::{Duration, Instant}, }; use dashmap::DashMap; @@ -35,6 +36,7 @@ use crate::{ use self::job_status::JobStatus; use super::{ + metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, @@ -52,35 +54,24 @@ struct Inner { health_count: AtomicU64, pool: Pool, queue_notifications: DashMap>, - upload_notifier: Notify, + upload_notifications: DashMap>, } -async fn delegate_notifications(receiver: flume::Receiver, inner: Arc) { - while let Ok(notification) = receiver.recv_async().await { - match notification.channel() { - "queue_status_channel" => { - // new job inserted for queue - let queue_name = notification.payload().to_string(); +struct UploadInterest { + inner: Arc, + interest: Option>, + upload_id: UploadId, +} - inner - .queue_notifications - .entry(queue_name) - .or_insert_with(crate::sync::notify) - .notify_waiters(); - } - "upload_completion_channel" => { - inner.upload_notifier.notify_waiters(); - } - channel => { - tracing::info!( - "Unhandled postgres notification: {channel}: {}", - notification.payload() - ); - } - } - } +struct JobNotifierState<'a> { + inner: &'a Inner, + capacity: usize, + jobs: BTreeSet, + jobs_ordered: VecDeque, +} - tracing::warn!("Notification delegator shutting down"); +struct UploadNotifierState<'a> { + inner: &'a Inner, } #[derive(Debug, thiserror::Error)] @@ -166,7 +157,7 @@ impl PostgresRepo { health_count: AtomicU64::new(0), pool, queue_notifications: DashMap::new(), - upload_notifier: crate::sync::bare_notify(), + upload_notifications: DashMap::new(), }); let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone())); @@ -184,10 +175,125 @@ impl PostgresRepo { } } +struct GetConnectionMetricsGuard { + start: Instant, + armed: bool, +} + +impl GetConnectionMetricsGuard { + fn guard() -> Self { + GetConnectionMetricsGuard { + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for GetConnectionMetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.postgres.pool.get.end", "completed" => (!self.armed).to_string()); + metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); + } +} + impl Inner { - #[tracing::instrument(level = "DEBUG", skip(self))] + #[tracing::instrument(level = "TRACE", skip(self))] async fn get_connection(&self) -> Result, PostgresError> { - self.pool.get().await.map_err(PostgresError::Pool) + let guard = GetConnectionMetricsGuard::guard(); + let res = self.pool.get().await.map_err(PostgresError::Pool); + guard.disarm(); + res + } + + fn interest(self: &Arc, upload_id: UploadId) -> UploadInterest { + let notify = crate::sync::notify(); + + self.upload_notifications + .insert(upload_id, Arc::downgrade(¬ify)); + + UploadInterest { + inner: self.clone(), + interest: Some(notify), + upload_id, + } + } +} + +impl UploadInterest { + async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> { + actix_rt::time::timeout( + timeout, + self.interest.as_ref().expect("interest exists").notified(), + ) + .await + } +} + +impl Drop for UploadInterest { + fn drop(&mut self) { + if let Some(interest) = self.interest.take() { + if Arc::into_inner(interest).is_some() { + self.inner.upload_notifications.remove(&self.upload_id); + } + } + } +} + +impl<'a> JobNotifierState<'a> { + fn handle(&mut self, payload: &str) { + let Some((job_id, queue_name)) = payload.split_once(' ') else { + tracing::warn!("Invalid queue payload {payload}"); + return; + }; + + let Ok(job_id) = job_id.parse::().map(JobId) else { + tracing::warn!("Invalid job ID {job_id}"); + return; + }; + + if !self.jobs.insert(job_id) { + // duplicate job + return; + } + + self.jobs_ordered.push_back(job_id); + + if self.jobs_ordered.len() > self.capacity { + if let Some(job_id) = self.jobs_ordered.pop_front() { + self.jobs.remove(&job_id); + } + } + + self.inner + .queue_notifications + .entry(queue_name.to_string()) + .or_insert_with(crate::sync::notify) + .notify_one(); + + metrics::increment_counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string()); + } +} + +impl<'a> UploadNotifierState<'a> { + fn handle(&self, payload: &str) { + let Ok(upload_id) = payload.parse::() else { + tracing::warn!("Invalid upload id {payload}"); + return; + }; + + if let Some(notifier) = self + .inner + .upload_notifications + .get(&upload_id) + .and_then(|weak| weak.upgrade()) + { + notifier.notify_waiters(); + metrics::increment_counter!("pict-rs.postgres.upload-notifier.notified"); + } } } @@ -195,6 +301,44 @@ type BoxFuture<'a, T> = std::pin::Pin + type ConfigFn = Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; +async fn delegate_notifications(receiver: flume::Receiver, inner: Arc) { + let parallelism = std::thread::available_parallelism() + .map(|u| u.into()) + .unwrap_or(1_usize); + + let mut job_notifier_state = JobNotifierState { + inner: &inner, + capacity: parallelism * 8, + jobs: BTreeSet::new(), + jobs_ordered: VecDeque::new(), + }; + + let upload_notifier_state = UploadNotifierState { inner: &inner }; + + while let Ok(notification) = receiver.recv_async().await { + metrics::increment_counter!("pict-rs.postgres.notification"); + + match notification.channel() { + "queue_status_channel" => { + // new job inserted for queue + job_notifier_state.handle(notification.payload()); + } + "upload_completion_channel" => { + // new upload finished + upload_notifier_state.handle(notification.payload()); + } + channel => { + tracing::info!( + "Unhandled postgres notification: {channel}: {}", + notification.payload() + ); + } + } + } + + tracing::warn!("Notification delegator shutting down"); +} + fn build_handler(sender: flume::Sender) -> ConfigFn { Box::new( move |config: &str| -> BoxFuture<'_, ConnectionResult> { @@ -834,6 +978,8 @@ impl QueueRepo for PostgresRepo { queue_name: &'static str, job_json: serde_json::Value, ) -> Result { + let guard = PushMetricsGuard::guard(queue_name); + use schema::job_queue::dsl::*; let mut conn = self.get_connection().await?; @@ -845,6 +991,8 @@ impl QueueRepo for PostgresRepo { .await .map_err(PostgresError::Diesel)?; + guard.disarm(); + Ok(JobId(job_id)) } @@ -854,6 +1002,8 @@ impl QueueRepo for PostgresRepo { queue_name: &'static str, worker_id: Uuid, ) -> Result<(JobId, serde_json::Value), RepoError> { + let guard = PopMetricsGuard::guard(queue_name); + use schema::job_queue::dsl::*; loop { @@ -923,6 +1073,7 @@ impl QueueRepo for PostgresRepo { }; if let Some((job_id, job_json)) = opt { + guard.disarm(); return Ok((JobId(job_id), job_json)); } @@ -1334,9 +1485,14 @@ impl UploadRepo for PostgresRepo { #[tracing::instrument(level = "DEBUG", skip(self))] async fn wait(&self, upload_id: UploadId) -> Result { + let guard = WaitMetricsGuard::guard(); use schema::uploads::dsl::*; + let interest = self.inner.interest(upload_id); + loop { + let interest_future = interest.notified_timeout(Duration::from_secs(5)); + let mut conn = self.get_connection().await?; diesel::sql_query("LISTEN upload_completion_channel;") @@ -1359,6 +1515,7 @@ impl UploadRepo for PostgresRepo { serde_json::from_value(upload_result) .map_err(PostgresError::DeserializeUploadResult)?; + guard.disarm(); return Ok(upload_result.into()); } } @@ -1369,13 +1526,7 @@ impl UploadRepo for PostgresRepo { drop(conn); - if actix_rt::time::timeout( - Duration::from_secs(5), - self.inner.upload_notifier.notified(), - ) - .await - .is_ok() - { + if interest_future.await.is_ok() { tracing::debug!("Notified"); } else { tracing::debug!("Timed out"); diff --git a/src/repo/postgres/migrations/V0006__create_queue.rs b/src/repo/postgres/migrations/V0006__create_queue.rs index f9a96e0..cfabb83 100644 --- a/src/repo/postgres/migrations/V0006__create_queue.rs +++ b/src/repo/postgres/migrations/V0006__create_queue.rs @@ -34,7 +34,7 @@ CREATE OR REPLACE FUNCTION queue_status_notify() RETURNS trigger AS $$ BEGIN - PERFORM pg_notify('queue_status_channel', NEW.queue::text); + PERFORM pg_notify('queue_status_channel', NEW.id::text || ' ' || NEW.queue::text); RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index aa42e9a..77eb054 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -12,17 +12,18 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, - time::Instant, }; use tokio::sync::Notify; use url::Url; use uuid::Uuid; use super::{ - hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, - Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, + hash::Hash, + metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, + Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, + DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, + QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, + VariantAccessRepo, VariantAlreadyExists, }; macro_rules! b { @@ -490,54 +491,6 @@ impl From for UploadResult { } } -struct PushMetricsGuard { - queue: &'static str, - armed: bool, -} - -struct PopMetricsGuard { - queue: &'static str, - start: Instant, - armed: bool, -} - -impl PushMetricsGuard { - fn guard(queue: &'static str) -> Self { - Self { queue, armed: true } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl PopMetricsGuard { - fn guard(queue: &'static str) -> Self { - Self { - queue, - start: Instant::now(), - armed: true, - } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl Drop for PushMetricsGuard { - fn drop(&mut self) { - metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue); - } -} - -impl Drop for PopMetricsGuard { - fn drop(&mut self) { - metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue); - metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue); - } -} - #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] @@ -551,6 +504,7 @@ impl UploadRepo for SledRepo { #[tracing::instrument(skip(self))] async fn wait(&self, upload_id: UploadId) -> Result { + let guard = WaitMetricsGuard::guard(); let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); let bytes = upload_id.as_bytes().to_vec(); @@ -560,6 +514,7 @@ impl UploadRepo for SledRepo { if bytes != b"1" { let result: InnerUploadResult = serde_json::from_slice(&bytes).map_err(SledError::UploadResult)?; + guard.disarm(); return Ok(result.into()); } } else { @@ -575,6 +530,8 @@ impl UploadRepo for SledRepo { if value != b"1" { let result: InnerUploadResult = serde_json::from_slice(&value).map_err(SledError::UploadResult)?; + + guard.disarm(); return Ok(result.into()); } }