From 08a66627dbb85b68c3ab0b5c2cb51473ecfff856 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Fri, 20 Aug 2021 21:28:33 -0400 Subject: [PATCH] Using unpooled connections in scheduler. --- api_tests/run-federation-test.sh | 2 +- crates/db_queries/src/lib.rs | 6 +++++- src/main.rs | 10 ++-------- src/scheduled_tasks.rs | 24 ++++++++++++++---------- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ebde0c04d..790da4559 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -13,7 +13,7 @@ popd yarn yarn api-test || true -killall lemmy_server +killall -9 lemmy_server for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do psql "$LEMMY_DATABASE_URL" -c "DROP DATABASE $INSTANCE" diff --git a/crates/db_queries/src/lib.rs b/crates/db_queries/src/lib.rs index 8e73ee812..aeeb5cbdc 100644 --- a/crates/db_queries/src/lib.rs +++ b/crates/db_queries/src/lib.rs @@ -283,8 +283,12 @@ pub fn establish_unpooled_connection() -> PgConnection { e ), }; + establish_unpooled_connection_with_db_url(&db_url) +} + +pub fn establish_unpooled_connection_with_db_url(db_url: &str) -> PgConnection { let conn = - PgConnection::establish(&db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); + PgConnection::establish(db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); embedded_migrations::run(&conn).expect("load migrations"); conn } diff --git a/src/main.rs b/src/main.rs index 16df7fa36..08a370901 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ async fn main() -> Result<(), LemmyError> { Err(_) => settings.get_database_url(), }; - let manager = Manager::new(db_url); + let manager = Manager::new(&db_url); let pool = Pool::new(manager, settings.database.pool_size); let conn = &pool.get().await?; @@ -41,13 +41,7 @@ async fn main() -> Result<(), LemmyError> { embedded_migrations::run(conn)?; run_advanced_migrations(conn)?; - // TODO can't move the pool into clokwerk, it doesn't yet support async - let c1 = pool.get().await?; - let c2 = pool.get().await?; - let c3 = pool.get().await?; - thread::spawn(move || { - scheduled_tasks::setup(c1, c2, c3); - }); + thread::spawn(move || scheduled_tasks::setup(&db_url)); // Set up the rate limiter let rate_limiter = RateLimit { diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index 089bd2356..ce5d331df 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -2,28 +2,32 @@ use clokwerk::{Scheduler, TimeUnits}; // Import week days and WeekDay use diesel::{sql_query, PgConnection, RunQueryDsl}; -use lemmy_db_queries::source::activity::Activity_; +use lemmy_db_queries::{establish_unpooled_connection_with_db_url, source::activity::Activity_}; use lemmy_db_schema::source::activity::Activity; use log::info; use std::{thread, time::Duration}; -type DeadpoolPgConnection = deadpool_diesel::Connection; - /// Schedules various cleanup tasks for lemmy in a background thread -pub fn setup(c1: DeadpoolPgConnection, c2: DeadpoolPgConnection, c3: DeadpoolPgConnection) { +pub fn setup(db_url: &str) { let mut scheduler = Scheduler::new(); - active_counts(&c1); - reindex_aggregates_tables(&c1); - clear_old_activities(&c1); + let conn = &establish_unpooled_connection_with_db_url(db_url); + active_counts(conn); + reindex_aggregates_tables(conn); + clear_old_activities(conn); + + let db_url2 = db_url.to_owned(); scheduler.every(1.hour()).run(move || { - active_counts(&c2); - reindex_aggregates_tables(&c2); + let conn = &establish_unpooled_connection_with_db_url(&db_url2); + active_counts(conn); + reindex_aggregates_tables(conn); }); + let db_url3 = db_url.to_owned(); scheduler.every(1.weeks()).run(move || { - clear_old_activities(&c3); + let conn = &establish_unpooled_connection_with_db_url(&db_url3); + clear_old_activities(conn); }); // Manually run the scheduler in an event loop