From 819b83bab71e385cfa64cdeadb68a3dcba1fd65c Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 25 Jan 2024 16:50:36 -0600 Subject: [PATCH] Use a far smaller pool to listen for db notifications --- src/repo/postgres.rs | 108 ++++++++++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 32 deletions(-) diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 12a9ecb..670d0c5 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -58,6 +58,7 @@ pub(crate) struct PostgresRepo { struct Inner { health_count: AtomicU64, pool: Pool, + notifier_pool: Pool, queue_notifications: DashMap>, upload_notifications: DashMap>, } @@ -88,7 +89,7 @@ pub(crate) enum ConnectPostgresError { Tls(#[source] TlsError), #[error("Failed to run migrations")] - Migration(#[source] refinery::Error), + Migration(#[source] Box), #[error("Failed to build postgres connection pool")] BuildPool(#[source] BuildError), @@ -232,6 +233,40 @@ async fn connect_for_migrations( Ok(tup) } +fn build_pool( + postgres_url: &Url, + tx: flume::Sender, + connector: Option, + max_size: usize, +) -> Result, ConnectPostgresError> { + let mut config = ManagerConfig::default(); + config.custom_setup = build_handler(tx, connector); + + let mgr = AsyncDieselConnectionManager::::new_with_config( + postgres_url.to_string(), + config, + ); + + let pool = Pool::builder(mgr) + .runtime(deadpool::Runtime::Tokio1) + .wait_timeout(Some(Duration::from_secs(10))) + .create_timeout(Some(Duration::from_secs(2))) + .recycle_timeout(Some(Duration::from_secs(2))) + .post_create(Hook::sync_fn(|_, _| { + metrics::counter!("pict-rs.postgres.pool.connection.create").increment(1); + Ok(()) + })) + .post_recycle(Hook::sync_fn(|_, _| { + metrics::counter!("pict-rs.postgres.pool.connection.recycle").increment(1); + Ok(()) + })) + .max_size(max_size) + .build() + .map_err(ConnectPostgresError::BuildPool)?; + + Ok(pool) +} + impl PostgresRepo { pub(crate) async fn connect( postgres_url: Url, @@ -253,6 +288,7 @@ impl PostgresRepo { embedded::migrations::runner() .run_async(&mut client) .await + .map_err(Box::new) .map_err(ConnectPostgresError::Migration)?; handle.abort(); @@ -264,34 +300,15 @@ impl PostgresRepo { let (tx, rx) = flume::bounded(10); - let mut config = ManagerConfig::default(); - config.custom_setup = build_handler(tx, connector); - - let mgr = AsyncDieselConnectionManager::::new_with_config( - postgres_url, - config, - ); - - let pool = Pool::builder(mgr) - .runtime(deadpool::Runtime::Tokio1) - .wait_timeout(Some(Duration::from_secs(10))) - .create_timeout(Some(Duration::from_secs(2))) - .recycle_timeout(Some(Duration::from_secs(2))) - .post_create(Hook::sync_fn(|_, _| { - metrics::counter!("pict-rs.postgres.pool.connection.create").increment(1); - Ok(()) - })) - .post_recycle(Hook::sync_fn(|_, _| { - metrics::counter!("pict-rs.postgres.pool.connection.recycle").increment(1); - Ok(()) - })) - .max_size(parallelism * 8) - .build() - .map_err(ConnectPostgresError::BuildPool)?; - let inner = Arc::new(Inner { health_count: AtomicU64::new(0), - pool, + pool: build_pool( + &postgres_url, + tx.clone(), + connector.clone(), + parallelism * 8, + )?, + notifier_pool: build_pool(&postgres_url, tx, connector, parallelism.min(4))?, queue_notifications: DashMap::new(), upload_notifications: DashMap::new(), }); @@ -312,6 +329,10 @@ impl PostgresRepo { async fn get_connection(&self) -> Result, PostgresError> { self.inner.get_connection().await } + + async fn get_notifier_connection(&self) -> Result, PostgresError> { + self.inner.get_notifier_connection().await + } } struct GetConnectionMetricsGuard { @@ -352,6 +373,21 @@ impl Inner { Ok(obj) } + #[tracing::instrument(level = "trace", skip(self))] + async fn get_notifier_connection(&self) -> Result, PostgresError> { + let guard = GetConnectionMetricsGuard::guard(); + + let obj = self + .notifier_pool + .get() + .await + .map_err(PostgresError::Pool)?; + + guard.disarm(); + + Ok(obj) + } + fn interest(self: &Arc, upload_id: UploadId) -> UploadInterest { let notify = crate::sync::notify(); @@ -1273,8 +1309,6 @@ impl QueueRepo for PostgresRepo { loop { tracing::trace!("pop: looping"); - let mut conn = self.get_connection().await?; - let notifier: Arc = self .inner .queue_notifications @@ -1282,14 +1316,20 @@ impl QueueRepo for PostgresRepo { .or_insert_with(crate::sync::notify) .clone(); + let mut notifier_conn = self.get_notifier_connection().await?; + diesel::sql_query("LISTEN queue_status_channel;") - .execute(&mut conn) + .execute(&mut notifier_conn) .with_metrics("pict-rs.postgres.queue.listen") .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; + drop(notifier_conn); + + let mut conn = self.get_connection().await?; + let timestamp = to_primitive(time::OffsetDateTime::now_utc()); let count = diesel::update(job_queue) @@ -1807,16 +1847,20 @@ impl UploadRepo for PostgresRepo { let interest_future = interest.notified_timeout(Duration::from_secs(5)); - let mut conn = self.get_connection().await?; + let mut notifier_conn = self.get_notifier_connection().await?; diesel::sql_query("LISTEN upload_completion_channel;") - .execute(&mut conn) + .execute(&mut notifier_conn) .with_metrics("pict-rs.postgres.uploads.listen") .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; + drop(notifier_conn); + + let mut conn = self.get_connection().await?; + let nested_opt = uploads .select(result) .filter(id.eq(upload_id.id))