From eac4cd54a4e86e41ec18699831752f23216c1d60 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 2 Sep 2023 20:13:32 -0500 Subject: [PATCH] Initial work for pg notifications --- src/repo/postgres.rs | 65 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 190ac2b..a3720d5 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -7,10 +7,11 @@ use diesel::prelude::*; use diesel_async::{ pooled_connection::{ deadpool::{BuildError, Pool, PoolError}, - AsyncDieselConnectionManager, + AsyncDieselConnectionManager, ManagerConfig, }, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; +use tokio_postgres::{AsyncMessage, Notification}; use url::Url; use crate::error_code::ErrorCode; @@ -23,6 +24,7 @@ use super::{ #[derive(Clone)] pub(crate) struct PostgresRepo { pool: Pool, + notifications: flume::Receiver, } #[derive(Debug, thiserror::Error)] @@ -69,15 +71,70 @@ impl PostgresRepo { handle.abort(); let _ = handle.await; - let config = AsyncDieselConnectionManager::::new(postgres_url); - let pool = Pool::builder(config) + let (tx, notifications) = flume::bounded(10); + + let mut config = ManagerConfig::default(); + config.custom_setup = build_handler(tx); + + let mgr = AsyncDieselConnectionManager::::new_with_config( + postgres_url, + config, + ); + let pool = Pool::builder(mgr) .build() .map_err(ConnectPostgresError::BuildPool)?; - Ok(PostgresRepo { pool }) + Ok(PostgresRepo { + pool, + notifications, + }) } } +type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; +type ConfigFn = + Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; + +fn build_handler(sender: flume::Sender) -> ConfigFn { + Box::new( + move |config: &str| -> BoxFuture<'_, ConnectionResult> { + let sender = sender.clone(); + + Box::pin(async move { + let (client, mut conn) = + tokio_postgres::connect(config, tokio_postgres::tls::NoTls) + .await + .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; + + // not very cash money (structured concurrency) of me + actix_rt::spawn(async move { + while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { + match res { + Err(e) => { + tracing::error!("Database Connection {e:?}"); + return; + } + Ok(AsyncMessage::Notice(e)) => { + tracing::warn!("Database Notice {e:?}"); + } + Ok(AsyncMessage::Notification(notification)) => { + if sender.send_async(notification).await.is_err() { + tracing::warn!("Missed notification. Are we shutting down?"); + } + } + Ok(_) => { + tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application"); + } + } + } + }); + + AsyncPgConnection::try_from(client).await + }) + }, + ) +} + fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime { let timestamp = timestamp.to_offset(time::UtcOffset::UTC); time::PrimitiveDateTime::new(timestamp.date(), timestamp.time())