Adding some recurring lemmy tasks. (#1386)

* Adding some recurring lemmy tasks.

- Add active users by day, week, month, and half year to site and
  community. Fixes #1195
- Periodically re-index the aggregates tables that use hot_rank.
  Fixes #1384
- Clear out old activities (> 6 months). Fixes #1133

* Some cleanup, recalculating actives every hour.
This commit is contained in:
Dessalines 2021-01-29 11:38:27 -05:00 committed by GitHub
parent f5e58c8bf5
commit 0fd0279543
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 233 additions and 2 deletions

10
Cargo.lock generated
View file

@ -759,6 +759,15 @@ dependencies = [
"generic-array 0.14.4", "generic-array 0.14.4",
] ]
[[package]]
name = "clokwerk"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9797a6d3acefa28d4cc62bf548b6ddc57b8ae51a43702d001cb46fba1dc48c1"
dependencies = [
"chrono",
]
[[package]] [[package]]
name = "color_quant" name = "color_quant"
version = "1.1.0" version = "1.1.0"
@ -1877,6 +1886,7 @@ dependencies = [
"awc", "awc",
"cargo-husky", "cargo-husky",
"chrono", "chrono",
"clokwerk",
"diesel", "diesel",
"diesel_migrations", "diesel_migrations",
"env_logger", "env_logger",

View file

@ -55,6 +55,7 @@ reqwest = { version = "0.10.10", features = ["json"] }
activitystreams = "0.7.0-alpha.8" activitystreams = "0.7.0-alpha.8"
actix-rt = { version = "1.1.1", default-features = false } actix-rt = { version = "1.1.1", default-features = false }
serde_json = { version = "1.0.60", features = ["preserve_order"] } serde_json = { version = "1.0.60", features = ["preserve_order"] }
clokwerk = "0.3.4"
[dev-dependencies.cargo-husky] [dev-dependencies.cargo-husky]
version = "1.5.0" version = "1.5.0"

View file

@ -11,6 +11,10 @@ pub struct CommunityAggregates {
pub posts: i64, pub posts: i64,
pub comments: i64, pub comments: i64,
pub published: chrono::NaiveDateTime, pub published: chrono::NaiveDateTime,
pub users_active_day: i64,
pub users_active_week: i64,
pub users_active_month: i64,
pub users_active_half_year: i64,
} }
impl CommunityAggregates { impl CommunityAggregates {

View file

@ -11,6 +11,10 @@ pub struct SiteAggregates {
pub posts: i64, pub posts: i64,
pub comments: i64, pub comments: i64,
pub communities: i64, pub communities: i64,
pub users_active_day: i64,
pub users_active_week: i64,
pub users_active_month: i64,
pub users_active_half_year: i64,
} }
impl SiteAggregates { impl SiteAggregates {

View file

@ -50,6 +50,7 @@ pub trait Activity_ {
T: Serialize + Debug; T: Serialize + Debug;
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>; fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
fn delete_olds(conn: &PgConnection) -> Result<usize, Error>;
/// Returns up to 20 activities of type `Announce/Create/Page` from the community /// Returns up to 20 activities of type `Announce/Create/Page` from the community
fn read_community_outbox( fn read_community_outbox(
@ -92,6 +93,11 @@ impl Activity_ for Activity {
activity.filter(ap_id.eq(object_id)).first::<Self>(conn) activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
} }
fn delete_olds(conn: &PgConnection) -> Result<usize, Error> {
use lemmy_db_schema::schema::activity::dsl::*;
diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn)
}
fn read_community_outbox( fn read_community_outbox(
conn: &PgConnection, conn: &PgConnection,
community_actor_id: &Url, community_actor_id: &Url,

View file

@ -110,6 +110,10 @@ table! {
posts -> Int8, posts -> Int8,
comments -> Int8, comments -> Int8,
published -> Timestamp, published -> Timestamp,
users_active_day -> Int8,
users_active_week -> Int8,
users_active_month -> Int8,
users_active_half_year -> Int8,
} }
} }
@ -371,6 +375,10 @@ table! {
posts -> Int8, posts -> Int8,
comments -> Int8, comments -> Int8,
communities -> Int8, communities -> Int8,
users_active_day -> Int8,
users_active_week -> Int8,
users_active_month -> Int8,
users_active_half_year -> Int8,
} }
} }

View file

@ -0,0 +1,14 @@
alter table site_aggregates
drop column users_active_day,
drop column users_active_week,
drop column users_active_month,
drop column users_active_half_year;
alter table community_aggregates
drop column users_active_day,
drop column users_active_week,
drop column users_active_month,
drop column users_active_half_year;
drop function site_aggregates_activity(i text);
drop function community_aggregates_activity(i text);

View file

@ -0,0 +1,89 @@
-- Add monthly and half yearly active columns for site and community aggregates
-- These columns don't need to be updated with a trigger, so they're saved daily via queries
alter table site_aggregates add column users_active_day bigint not null default 0;
alter table site_aggregates add column users_active_week bigint not null default 0;
alter table site_aggregates add column users_active_month bigint not null default 0;
alter table site_aggregates add column users_active_half_year bigint not null default 0;
alter table community_aggregates add column users_active_day bigint not null default 0;
alter table community_aggregates add column users_active_week bigint not null default 0;
alter table community_aggregates add column users_active_month bigint not null default 0;
alter table community_aggregates add column users_active_half_year bigint not null default 0;
create or replace function site_aggregates_activity(i text)
returns int
language plpgsql
as
$$
declare
count_ integer;
begin
select count(*)
into count_
from (
select c.creator_id from comment c
inner join user_ u on c.creator_id = u.id
where c.published > ('now'::timestamp - i::interval)
and u.local = true
union
select p.creator_id from post p
inner join user_ u on p.creator_id = u.id
where p.published > ('now'::timestamp - i::interval)
and u.local = true
) a;
return count_;
end;
$$;
update site_aggregates
set users_active_day = (select * from site_aggregates_activity('1 day'));
update site_aggregates
set users_active_week = (select * from site_aggregates_activity('1 week'));
update site_aggregates
set users_active_month = (select * from site_aggregates_activity('1 month'));
update site_aggregates
set users_active_half_year = (select * from site_aggregates_activity('6 months'));
create or replace function community_aggregates_activity(i text)
returns table(count_ bigint, community_id_ integer)
language plpgsql
as
$$
begin
return query
select count(*), community_id
from (
select c.creator_id, p.community_id from comment c
inner join post p on c.post_id = p.id
where c.published > ('now'::timestamp - i::interval)
union
select p.creator_id, p.community_id from post p
where p.published > ('now'::timestamp - i::interval)
) a
group by community_id;
end;
$$;
update community_aggregates ca
set users_active_day = mv.count_
from community_aggregates_activity('1 day') mv
where ca.community_id = mv.community_id_;
update community_aggregates ca
set users_active_week = mv.count_
from community_aggregates_activity('1 week') mv
where ca.community_id = mv.community_id_;
update community_aggregates ca
set users_active_month = mv.count_
from community_aggregates_activity('1 month') mv
where ca.community_id = mv.community_id_;
update community_aggregates ca
set users_active_half_year = mv.count_
from community_aggregates_activity('6 months') mv
where ca.community_id = mv.community_id_;

View file

@ -3,3 +3,4 @@
extern crate lazy_static; extern crate lazy_static;
pub mod code_migrations; pub mod code_migrations;
pub mod routes; pub mod routes;
pub mod scheduled_tasks;

View file

@ -10,7 +10,7 @@ use diesel::{
use lemmy_api::match_websocket_operation; use lemmy_api::match_websocket_operation;
use lemmy_apub::activity_queue::create_activity_queue; use lemmy_apub::activity_queue::create_activity_queue;
use lemmy_db_queries::get_database_url_from_env; use lemmy_db_queries::get_database_url_from_env;
use lemmy_server::{code_migrations::run_advanced_migrations, routes::*}; use lemmy_server::{code_migrations::run_advanced_migrations, routes::*, scheduled_tasks};
use lemmy_structs::blocking; use lemmy_structs::blocking;
use lemmy_utils::{ use lemmy_utils::{
rate_limit::{rate_limiter::RateLimiter, RateLimit}, rate_limit::{rate_limiter::RateLimiter, RateLimit},
@ -19,7 +19,7 @@ use lemmy_utils::{
}; };
use lemmy_websocket::{chat_server::ChatServer, LemmyContext}; use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest::Client; use reqwest::Client;
use std::sync::Arc; use std::{sync::Arc, thread};
use tokio::sync::Mutex; use tokio::sync::Mutex;
embed_migrations!(); embed_migrations!();
@ -48,6 +48,11 @@ async fn main() -> Result<(), LemmyError> {
}) })
.await??; .await??;
let pool2 = pool.clone();
thread::spawn(move || {
scheduled_tasks::setup(pool2);
});
// Set up the rate limiter // Set up the rate limiter
let rate_limiter = RateLimit { let rate_limiter = RateLimit {
rate_limiter: Arc::new(Mutex::new(RateLimiter::default())), rate_limiter: Arc::new(Mutex::new(RateLimiter::default())),

View file

@ -48,6 +48,8 @@ async fn node_info(context: web::Data<LemmyContext>) -> Result<HttpResponse, Err
usage: NodeInfoUsage { usage: NodeInfoUsage {
users: NodeInfoUsers { users: NodeInfoUsers {
total: site_view.counts.users, total: site_view.counts.users,
active_half_year: site_view.counts.users_active_half_year,
active_month: site_view.counts.users_active_month,
}, },
local_posts: site_view.counts.posts, local_posts: site_view.counts.posts,
local_comments: site_view.counts.comments, local_comments: site_view.counts.comments,
@ -96,4 +98,6 @@ struct NodeInfoUsage {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
struct NodeInfoUsers { struct NodeInfoUsers {
pub total: i64, pub total: i64,
pub active_half_year: i64,
pub active_month: i64,
} }

85
src/scheduled_tasks.rs Normal file
View file

@ -0,0 +1,85 @@
// Scheduler, and trait for .seconds(), .minutes(), etc.
use clokwerk::{Scheduler, TimeUnits};
// Import week days and WeekDay
use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_db_queries::{source::activity::Activity_, DbPool};
use lemmy_db_schema::source::activity::Activity;
use log::info;
use std::{thread, time::Duration};
/// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(pool: DbPool) {
let mut scheduler = Scheduler::new();
let conn = pool.get().unwrap();
active_counts(&conn);
reindex_aggregates_tables(&conn);
scheduler.every(1.hour()).run(move || {
active_counts(&conn);
reindex_aggregates_tables(&conn);
});
let conn = pool.get().unwrap();
clear_old_activities(&conn);
scheduler.every(1.weeks()).run(move || {
clear_old_activities(&conn);
});
// Manually run the scheduler in an event loop
loop {
scheduler.run_pending();
thread::sleep(Duration::from_millis(1000));
}
}
/// Reindex the aggregates tables every one hour
/// This is necessary because hot_rank is actually a mutable function:
/// https://dba.stackexchange.com/questions/284052/how-to-create-an-index-based-on-a-time-based-function-in-postgres?noredirect=1#comment555727_284052
fn reindex_aggregates_tables(conn: &PgConnection) {
for table_name in &[
"post_aggregates",
"comment_aggregates",
"community_aggregates",
] {
reindex_table(&conn, &table_name);
}
}
fn reindex_table(conn: &PgConnection, table_name: &str) {
info!("Reindexing table {} ...", table_name);
let query = format!("reindex table concurrently {}", table_name);
sql_query(query).execute(conn).unwrap();
info!("Done.");
}
/// Clear old activities (this table gets very large)
fn clear_old_activities(conn: &PgConnection) {
info!("Clearing old activities...");
Activity::delete_olds(&conn).unwrap();
info!("Done.");
}
/// Re-calculate the site and community active counts every 12 hours
fn active_counts(conn: &PgConnection) {
info!("Updating active site and community aggregates ...");
let intervals = vec![
("1 day", "day"),
("1 week", "week"),
("1 month", "month"),
("6 months", "half_year"),
];
for i in &intervals {
let update_site_stmt = format!(
"update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
i.1, i.0
);
sql_query(update_site_stmt).execute(conn).unwrap();
let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
sql_query(update_community_stmt).execute(conn).unwrap();
}
info!("Done.");
}