From 0fd0279543be156450818a4b8db4f5dee7600f48 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Fri, 29 Jan 2021 11:38:27 -0500 Subject: [PATCH] Adding some recurring lemmy tasks. (#1386) * Adding some recurring lemmy tasks. - Add active users by day, week, month, and half year to site and community. Fixes #1195 - Periodically re-index the aggregates tables that use hot_rank. Fixes #1384 - Clear out old activities (> 6 months). Fixes #1133 * Some cleanup, recalculating actives every hour. --- Cargo.lock | 10 +++ Cargo.toml | 1 + .../src/aggregates/community_aggregates.rs | 4 + .../src/aggregates/site_aggregates.rs | 4 + crates/db_queries/src/source/activity.rs | 6 ++ crates/db_schema/src/schema.rs | 8 ++ .../down.sql | 14 +++ .../up.sql | 89 +++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 9 +- src/routes/nodeinfo.rs | 4 + src/scheduled_tasks.rs | 85 ++++++++++++++++++ 12 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 migrations/2021-01-27-202728_active_users_monthly/down.sql create mode 100644 migrations/2021-01-27-202728_active_users_monthly/up.sql create mode 100644 src/scheduled_tasks.rs diff --git a/Cargo.lock b/Cargo.lock index f15cf8e88a..618ba83a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -759,6 +759,15 @@ dependencies = [ "generic-array 0.14.4", ] +[[package]] +name = "clokwerk" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9797a6d3acefa28d4cc62bf548b6ddc57b8ae51a43702d001cb46fba1dc48c1" +dependencies = [ + "chrono", +] + [[package]] name = "color_quant" version = "1.1.0" @@ -1877,6 +1886,7 @@ dependencies = [ "awc", "cargo-husky", "chrono", + "clokwerk", "diesel", "diesel_migrations", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index 1488ed9523..bd565c7a13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ reqwest = { version = "0.10.10", features = ["json"] } activitystreams = "0.7.0-alpha.8" actix-rt = { version = "1.1.1", default-features = false } serde_json = { version = "1.0.60", features = ["preserve_order"] } +clokwerk = "0.3.4" [dev-dependencies.cargo-husky] version = "1.5.0" diff --git a/crates/db_queries/src/aggregates/community_aggregates.rs b/crates/db_queries/src/aggregates/community_aggregates.rs index 3fb891c11e..0f15453ad5 100644 --- a/crates/db_queries/src/aggregates/community_aggregates.rs +++ b/crates/db_queries/src/aggregates/community_aggregates.rs @@ -11,6 +11,10 @@ pub struct CommunityAggregates { pub posts: i64, pub comments: i64, pub published: chrono::NaiveDateTime, + pub users_active_day: i64, + pub users_active_week: i64, + pub users_active_month: i64, + pub users_active_half_year: i64, } impl CommunityAggregates { diff --git a/crates/db_queries/src/aggregates/site_aggregates.rs b/crates/db_queries/src/aggregates/site_aggregates.rs index b12e2b60af..ce9f2f7615 100644 --- a/crates/db_queries/src/aggregates/site_aggregates.rs +++ b/crates/db_queries/src/aggregates/site_aggregates.rs @@ -11,6 +11,10 @@ pub struct SiteAggregates { pub posts: i64, pub comments: i64, pub communities: i64, + pub users_active_day: i64, + pub users_active_week: i64, + pub users_active_month: i64, + pub users_active_half_year: i64, } impl SiteAggregates { diff --git a/crates/db_queries/src/source/activity.rs b/crates/db_queries/src/source/activity.rs index 964e50424e..d47bc256ff 100644 --- a/crates/db_queries/src/source/activity.rs +++ b/crates/db_queries/src/source/activity.rs @@ -50,6 +50,7 @@ pub trait Activity_ { T: Serialize + Debug; fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result; + fn delete_olds(conn: &PgConnection) -> Result; /// Returns up to 20 activities of type `Announce/Create/Page` from the community fn read_community_outbox( @@ -92,6 +93,11 @@ impl Activity_ for Activity { activity.filter(ap_id.eq(object_id)).first::(conn) } + fn delete_olds(conn: &PgConnection) -> Result { + use lemmy_db_schema::schema::activity::dsl::*; + diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn) + } + fn read_community_outbox( conn: &PgConnection, community_actor_id: &Url, diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index bbc2e7b80e..9ff73eccbc 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -110,6 +110,10 @@ table! { posts -> Int8, comments -> Int8, published -> Timestamp, + users_active_day -> Int8, + users_active_week -> Int8, + users_active_month -> Int8, + users_active_half_year -> Int8, } } @@ -371,6 +375,10 @@ table! { posts -> Int8, comments -> Int8, communities -> Int8, + users_active_day -> Int8, + users_active_week -> Int8, + users_active_month -> Int8, + users_active_half_year -> Int8, } } diff --git a/migrations/2021-01-27-202728_active_users_monthly/down.sql b/migrations/2021-01-27-202728_active_users_monthly/down.sql new file mode 100644 index 0000000000..bc3d46a548 --- /dev/null +++ b/migrations/2021-01-27-202728_active_users_monthly/down.sql @@ -0,0 +1,14 @@ +alter table site_aggregates + drop column users_active_day, + drop column users_active_week, + drop column users_active_month, + drop column users_active_half_year; + +alter table community_aggregates + drop column users_active_day, + drop column users_active_week, + drop column users_active_month, + drop column users_active_half_year; + +drop function site_aggregates_activity(i text); +drop function community_aggregates_activity(i text); diff --git a/migrations/2021-01-27-202728_active_users_monthly/up.sql b/migrations/2021-01-27-202728_active_users_monthly/up.sql new file mode 100644 index 0000000000..9248ae8687 --- /dev/null +++ b/migrations/2021-01-27-202728_active_users_monthly/up.sql @@ -0,0 +1,89 @@ +-- Add monthly and half yearly active columns for site and community aggregates + +-- These columns don't need to be updated with a trigger, so they're saved daily via queries +alter table site_aggregates add column users_active_day bigint not null default 0; +alter table site_aggregates add column users_active_week bigint not null default 0; +alter table site_aggregates add column users_active_month bigint not null default 0; +alter table site_aggregates add column users_active_half_year bigint not null default 0; + +alter table community_aggregates add column users_active_day bigint not null default 0; +alter table community_aggregates add column users_active_week bigint not null default 0; +alter table community_aggregates add column users_active_month bigint not null default 0; +alter table community_aggregates add column users_active_half_year bigint not null default 0; + +create or replace function site_aggregates_activity(i text) +returns int +language plpgsql +as +$$ +declare + count_ integer; +begin + select count(*) + into count_ + from ( + select c.creator_id from comment c + inner join user_ u on c.creator_id = u.id + where c.published > ('now'::timestamp - i::interval) + and u.local = true + union + select p.creator_id from post p + inner join user_ u on p.creator_id = u.id + where p.published > ('now'::timestamp - i::interval) + and u.local = true + ) a; + return count_; +end; +$$; + +update site_aggregates +set users_active_day = (select * from site_aggregates_activity('1 day')); + +update site_aggregates +set users_active_week = (select * from site_aggregates_activity('1 week')); + +update site_aggregates +set users_active_month = (select * from site_aggregates_activity('1 month')); + +update site_aggregates +set users_active_half_year = (select * from site_aggregates_activity('6 months')); + +create or replace function community_aggregates_activity(i text) +returns table(count_ bigint, community_id_ integer) +language plpgsql +as +$$ +begin + return query + select count(*), community_id + from ( + select c.creator_id, p.community_id from comment c + inner join post p on c.post_id = p.id + where c.published > ('now'::timestamp - i::interval) + union + select p.creator_id, p.community_id from post p + where p.published > ('now'::timestamp - i::interval) + ) a + group by community_id; +end; +$$; + +update community_aggregates ca +set users_active_day = mv.count_ +from community_aggregates_activity('1 day') mv +where ca.community_id = mv.community_id_; + +update community_aggregates ca +set users_active_week = mv.count_ +from community_aggregates_activity('1 week') mv +where ca.community_id = mv.community_id_; + +update community_aggregates ca +set users_active_month = mv.count_ +from community_aggregates_activity('1 month') mv +where ca.community_id = mv.community_id_; + +update community_aggregates ca +set users_active_half_year = mv.count_ +from community_aggregates_activity('6 months') mv +where ca.community_id = mv.community_id_; diff --git a/src/lib.rs b/src/lib.rs index e6ea900ee7..e867a26b32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,4 @@ extern crate lazy_static; pub mod code_migrations; pub mod routes; +pub mod scheduled_tasks; diff --git a/src/main.rs b/src/main.rs index fad3680bba..f00f724a69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use diesel::{ use lemmy_api::match_websocket_operation; use lemmy_apub::activity_queue::create_activity_queue; use lemmy_db_queries::get_database_url_from_env; -use lemmy_server::{code_migrations::run_advanced_migrations, routes::*}; +use lemmy_server::{code_migrations::run_advanced_migrations, routes::*, scheduled_tasks}; use lemmy_structs::blocking; use lemmy_utils::{ rate_limit::{rate_limiter::RateLimiter, RateLimit}, @@ -19,7 +19,7 @@ use lemmy_utils::{ }; use lemmy_websocket::{chat_server::ChatServer, LemmyContext}; use reqwest::Client; -use std::sync::Arc; +use std::{sync::Arc, thread}; use tokio::sync::Mutex; embed_migrations!(); @@ -48,6 +48,11 @@ async fn main() -> Result<(), LemmyError> { }) .await??; + let pool2 = pool.clone(); + thread::spawn(move || { + scheduled_tasks::setup(pool2); + }); + // Set up the rate limiter let rate_limiter = RateLimit { rate_limiter: Arc::new(Mutex::new(RateLimiter::default())), diff --git a/src/routes/nodeinfo.rs b/src/routes/nodeinfo.rs index df0064ecc6..e2f038fd2e 100644 --- a/src/routes/nodeinfo.rs +++ b/src/routes/nodeinfo.rs @@ -48,6 +48,8 @@ async fn node_info(context: web::Data) -> Result