2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 11:21:24 +00:00

Use a far smaller pool to listen for db notifications

This commit is contained in:
asonix 2024-01-25 16:50:36 -06:00
parent c65b49339f
commit 819b83bab7

View file

@ -58,6 +58,7 @@ pub(crate) struct PostgresRepo {
struct Inner { struct Inner {
health_count: AtomicU64, health_count: AtomicU64,
pool: Pool<AsyncPgConnection>, pool: Pool<AsyncPgConnection>,
notifier_pool: Pool<AsyncPgConnection>,
queue_notifications: DashMap<String, Arc<Notify>>, queue_notifications: DashMap<String, Arc<Notify>>,
upload_notifications: DashMap<UploadId, Weak<Notify>>, upload_notifications: DashMap<UploadId, Weak<Notify>>,
} }
@ -88,7 +89,7 @@ pub(crate) enum ConnectPostgresError {
Tls(#[source] TlsError), Tls(#[source] TlsError),
#[error("Failed to run migrations")] #[error("Failed to run migrations")]
Migration(#[source] refinery::Error), Migration(#[source] Box<refinery::Error>),
#[error("Failed to build postgres connection pool")] #[error("Failed to build postgres connection pool")]
BuildPool(#[source] BuildError), BuildPool(#[source] BuildError),
@ -232,6 +233,40 @@ async fn connect_for_migrations(
Ok(tup) Ok(tup)
} }
fn build_pool(
postgres_url: &Url,
tx: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
max_size: usize,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
let mut config = ManagerConfig::default();
config.custom_setup = build_handler(tx, connector);
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
postgres_url.to_string(),
config,
);
let pool = Pool::builder(mgr)
.runtime(deadpool::Runtime::Tokio1)
.wait_timeout(Some(Duration::from_secs(10)))
.create_timeout(Some(Duration::from_secs(2)))
.recycle_timeout(Some(Duration::from_secs(2)))
.post_create(Hook::sync_fn(|_, _| {
metrics::counter!("pict-rs.postgres.pool.connection.create").increment(1);
Ok(())
}))
.post_recycle(Hook::sync_fn(|_, _| {
metrics::counter!("pict-rs.postgres.pool.connection.recycle").increment(1);
Ok(())
}))
.max_size(max_size)
.build()
.map_err(ConnectPostgresError::BuildPool)?;
Ok(pool)
}
impl PostgresRepo { impl PostgresRepo {
pub(crate) async fn connect( pub(crate) async fn connect(
postgres_url: Url, postgres_url: Url,
@ -253,6 +288,7 @@ impl PostgresRepo {
embedded::migrations::runner() embedded::migrations::runner()
.run_async(&mut client) .run_async(&mut client)
.await .await
.map_err(Box::new)
.map_err(ConnectPostgresError::Migration)?; .map_err(ConnectPostgresError::Migration)?;
handle.abort(); handle.abort();
@ -264,34 +300,15 @@ impl PostgresRepo {
let (tx, rx) = flume::bounded(10); let (tx, rx) = flume::bounded(10);
let mut config = ManagerConfig::default();
config.custom_setup = build_handler(tx, connector);
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
postgres_url,
config,
);
let pool = Pool::builder(mgr)
.runtime(deadpool::Runtime::Tokio1)
.wait_timeout(Some(Duration::from_secs(10)))
.create_timeout(Some(Duration::from_secs(2)))
.recycle_timeout(Some(Duration::from_secs(2)))
.post_create(Hook::sync_fn(|_, _| {
metrics::counter!("pict-rs.postgres.pool.connection.create").increment(1);
Ok(())
}))
.post_recycle(Hook::sync_fn(|_, _| {
metrics::counter!("pict-rs.postgres.pool.connection.recycle").increment(1);
Ok(())
}))
.max_size(parallelism * 8)
.build()
.map_err(ConnectPostgresError::BuildPool)?;
let inner = Arc::new(Inner { let inner = Arc::new(Inner {
health_count: AtomicU64::new(0), health_count: AtomicU64::new(0),
pool, pool: build_pool(
&postgres_url,
tx.clone(),
connector.clone(),
parallelism * 8,
)?,
notifier_pool: build_pool(&postgres_url, tx, connector, parallelism.min(4))?,
queue_notifications: DashMap::new(), queue_notifications: DashMap::new(),
upload_notifications: DashMap::new(), upload_notifications: DashMap::new(),
}); });
@ -312,6 +329,10 @@ impl PostgresRepo {
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> { async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
self.inner.get_connection().await self.inner.get_connection().await
} }
async fn get_notifier_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
self.inner.get_notifier_connection().await
}
} }
struct GetConnectionMetricsGuard { struct GetConnectionMetricsGuard {
@ -352,6 +373,21 @@ impl Inner {
Ok(obj) Ok(obj)
} }
#[tracing::instrument(level = "trace", skip(self))]
async fn get_notifier_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
let guard = GetConnectionMetricsGuard::guard();
let obj = self
.notifier_pool
.get()
.await
.map_err(PostgresError::Pool)?;
guard.disarm();
Ok(obj)
}
fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest { fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest {
let notify = crate::sync::notify(); let notify = crate::sync::notify();
@ -1273,8 +1309,6 @@ impl QueueRepo for PostgresRepo {
loop { loop {
tracing::trace!("pop: looping"); tracing::trace!("pop: looping");
let mut conn = self.get_connection().await?;
let notifier: Arc<Notify> = self let notifier: Arc<Notify> = self
.inner .inner
.queue_notifications .queue_notifications
@ -1282,14 +1316,20 @@ impl QueueRepo for PostgresRepo {
.or_insert_with(crate::sync::notify) .or_insert_with(crate::sync::notify)
.clone(); .clone();
let mut notifier_conn = self.get_notifier_connection().await?;
diesel::sql_query("LISTEN queue_status_channel;") diesel::sql_query("LISTEN queue_status_channel;")
.execute(&mut conn) .execute(&mut notifier_conn)
.with_metrics("pict-rs.postgres.queue.listen") .with_metrics("pict-rs.postgres.queue.listen")
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
drop(notifier_conn);
let mut conn = self.get_connection().await?;
let timestamp = to_primitive(time::OffsetDateTime::now_utc()); let timestamp = to_primitive(time::OffsetDateTime::now_utc());
let count = diesel::update(job_queue) let count = diesel::update(job_queue)
@ -1807,16 +1847,20 @@ impl UploadRepo for PostgresRepo {
let interest_future = interest.notified_timeout(Duration::from_secs(5)); let interest_future = interest.notified_timeout(Duration::from_secs(5));
let mut conn = self.get_connection().await?; let mut notifier_conn = self.get_notifier_connection().await?;
diesel::sql_query("LISTEN upload_completion_channel;") diesel::sql_query("LISTEN upload_completion_channel;")
.execute(&mut conn) .execute(&mut notifier_conn)
.with_metrics("pict-rs.postgres.uploads.listen") .with_metrics("pict-rs.postgres.uploads.listen")
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
drop(notifier_conn);
let mut conn = self.get_connection().await?;
let nested_opt = uploads let nested_opt = uploads
.select(result) .select(result)
.filter(id.eq(upload_id.id)) .filter(id.eq(upload_id.id))