2024-09-24 09:39:40 +00:00
use activitypub_federation ::config ::Data ;
2023-08-24 15:27:00 +00:00
use chrono ::{ DateTime , TimeZone , Utc } ;
2023-09-11 09:12:16 +00:00
use clokwerk ::{ AsyncScheduler , TimeUnits as CTimeUnits } ;
2023-04-25 23:28:06 +00:00
use diesel ::{
2024-09-24 09:39:40 +00:00
dsl ::{ exists , not , IntervalDsl } ,
2025-01-09 14:35:33 +00:00
query_builder ::AsQuery ,
2023-09-11 09:12:16 +00:00
sql_query ,
2023-08-24 15:27:00 +00:00
sql_types ::{ Integer , Timestamptz } ,
2024-09-24 09:39:40 +00:00
BoolExpressionMethods ,
2023-04-25 23:28:06 +00:00
ExpressionMethods ,
2023-06-20 06:17:54 +00:00
NullableExpressionMethods ,
2023-04-25 23:28:06 +00:00
QueryDsl ,
2023-06-27 08:13:51 +00:00
QueryableByName ,
2023-04-25 23:28:06 +00:00
} ;
2023-09-11 09:12:16 +00:00
use diesel_async ::{ AsyncPgConnection , RunQueryDsl } ;
2024-09-24 09:39:40 +00:00
use lemmy_api_common ::{
context ::LemmyContext ,
send_activity ::{ ActivityChannel , SendActivityData } ,
} ;
use lemmy_api_crud ::post ::create ::send_webmention ;
2023-02-18 14:36:12 +00:00
use lemmy_db_schema ::{
2023-07-14 15:17:06 +00:00
schema ::{
captcha_answer ,
comment ,
2024-09-24 09:39:40 +00:00
community ,
2024-11-11 10:34:10 +00:00
community_actions ,
2024-11-28 23:21:43 +00:00
federation_blocklist ,
2023-07-14 15:17:06 +00:00
instance ,
person ,
post ,
received_activity ,
sent_activity ,
} ,
2024-02-15 12:50:53 +00:00
source ::{
2024-09-24 09:39:40 +00:00
community ::Community ,
2024-02-15 12:50:53 +00:00
instance ::{ Instance , InstanceForm } ,
local_user ::LocalUser ,
2024-09-24 09:39:40 +00:00
post ::{ Post , PostUpdateForm } ,
2024-02-15 12:50:53 +00:00
} ,
2024-09-24 09:39:40 +00:00
traits ::Crud ,
2025-01-09 14:35:33 +00:00
utils ::{
find_action ,
functions ::coalesce ,
get_conn ,
now ,
uplete ,
DbPool ,
DELETED_REPLACEMENT_TEXT ,
} ,
2023-02-18 14:36:12 +00:00
} ;
2024-06-03 21:30:00 +00:00
use lemmy_routes ::nodeinfo ::{ NodeInfo , NodeInfoWellKnown } ;
2024-12-21 00:21:09 +00:00
use lemmy_utils ::error ::{ LemmyErrorType , LemmyResult } ;
2023-09-11 09:12:16 +00:00
use reqwest_middleware ::ClientWithMiddleware ;
use std ::time ::Duration ;
2024-12-21 00:21:09 +00:00
use tracing ::{ info , warn } ;
2021-01-29 16:38:27 +00:00
/// Schedules various cleanup tasks for lemmy in a background thread
2024-09-24 09:39:40 +00:00
pub async fn setup ( context : Data < LemmyContext > ) -> LemmyResult < ( ) > {
2022-11-09 10:05:00 +00:00
// Setup the connections
2023-09-11 09:12:16 +00:00
let mut scheduler = AsyncScheduler ::new ( ) ;
2024-12-21 00:21:09 +00:00
startup_jobs ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to run startup tasks: {e} " ) )
. ok ( ) ;
2021-08-26 11:49:16 +00:00
2023-09-11 09:12:16 +00:00
let context_1 = context . clone ( ) ;
2024-09-24 09:39:40 +00:00
// Update active counts expired bans and unpublished posts every hour
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
2023-09-11 09:12:16 +00:00
let context = context_1 . clone ( ) ;
async move {
2024-12-21 00:21:09 +00:00
active_counts ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to update active counts: {e} " ) )
. ok ( ) ;
update_banned_when_expired ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to update expired bans: {e} " ) )
. ok ( ) ;
delete_instance_block_when_expired ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to delete expired instance bans: {e} " ) )
. ok ( ) ;
2023-09-11 09:12:16 +00:00
}
2023-06-08 20:15:15 +00:00
} ) ;
2024-09-24 09:39:40 +00:00
let context_1 = context . reset_request_count ( ) ;
// Every 10 minutes update hot ranks, delete expired captchas and publish scheduled posts
2023-09-11 09:12:16 +00:00
scheduler . every ( CTimeUnits ::minutes ( 10 ) ) . run ( move | | {
2024-09-24 09:39:40 +00:00
let context = context_1 . reset_request_count ( ) ;
2023-09-11 09:12:16 +00:00
async move {
2024-12-21 00:21:09 +00:00
update_hot_ranks ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to update hot ranks: {e} " ) )
. ok ( ) ;
delete_expired_captcha_answers ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to delete expired captcha answers: {e} " ) )
. ok ( ) ;
publish_scheduled_posts ( & context )
. await
. inspect_err ( | e | warn! ( " Failed to publish scheduled posts: {e} " ) )
. ok ( ) ;
2023-09-11 09:12:16 +00:00
}
2023-06-27 10:38:53 +00:00
} ) ;
2023-09-11 09:12:16 +00:00
let context_1 = context . clone ( ) ;
2023-06-08 20:15:15 +00:00
// Clear old activities every week
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::weeks ( 1 ) ) . run ( move | | {
2023-09-11 09:12:16 +00:00
let context = context_1 . clone ( ) ;
async move {
2024-12-21 00:21:09 +00:00
clear_old_activities ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to clear old activities: {e} " ) )
. ok ( ) ;
2023-09-11 09:12:16 +00:00
}
2021-01-29 16:38:27 +00:00
} ) ;
2023-09-11 09:12:16 +00:00
let context_1 = context . clone ( ) ;
2024-02-15 12:50:53 +00:00
// Daily tasks:
// - Overwrite deleted & removed posts and comments every day
// - Delete old denied users
// - Update instance software
2023-06-20 06:17:54 +00:00
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
2023-09-11 09:12:16 +00:00
let context = context_1 . clone ( ) ;
async move {
2024-12-21 00:21:09 +00:00
overwrite_deleted_posts_and_comments ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to overwrite deleted posts/comments: {e} " ) )
. ok ( ) ;
delete_old_denied_users ( & mut context . pool ( ) )
. await
. inspect_err ( | e | warn! ( " Failed to delete old denied users: {e} " ) )
. ok ( ) ;
2023-09-11 09:12:16 +00:00
update_instance_software ( & mut context . pool ( ) , context . client ( ) )
. await
2024-09-24 09:39:40 +00:00
. inspect_err ( | e | warn! ( " Failed to update instance software: {e} " ) )
2023-09-11 09:12:16 +00:00
. ok ( ) ;
}
2023-02-18 14:36:12 +00:00
} ) ;
2021-01-29 16:38:27 +00:00
// Manually run the scheduler in an event loop
loop {
2023-09-11 09:12:16 +00:00
scheduler . run_pending ( ) . await ;
tokio ::time ::sleep ( Duration ::from_millis ( 1000 ) ) . await ;
2021-01-29 16:38:27 +00:00
}
}
2023-06-20 09:33:03 +00:00
/// Run these on server startup
2024-12-21 00:21:09 +00:00
async fn startup_jobs ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
active_counts ( pool ) . await ? ;
update_hot_ranks ( pool ) . await ? ;
update_banned_when_expired ( pool ) . await ? ;
delete_instance_block_when_expired ( pool ) . await ? ;
clear_old_activities ( pool ) . await ? ;
overwrite_deleted_posts_and_comments ( pool ) . await ? ;
delete_old_denied_users ( pool ) . await ? ;
Ok ( ( ) )
2023-06-20 09:33:03 +00:00
}
2023-06-08 20:15:15 +00:00
/// Update the hot_rank columns for the aggregates tables
2023-06-27 08:13:51 +00:00
/// Runs in batches until all necessary rows are updated once
2024-12-21 00:21:09 +00:00
async fn update_hot_ranks ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2023-07-17 09:05:55 +00:00
info! ( " Updating hot ranks for all history... " ) ;
2023-06-27 08:13:51 +00:00
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
process_post_aggregates_ranks_in_batches ( & mut conn ) . await ? ;
process_ranks_in_batches (
& mut conn ,
" comment " ,
" a.hot_rank != 0 " ,
" SET hot_rank = r.hot_rank(a.score, a.published) " ,
)
. await ? ;
process_ranks_in_batches (
& mut conn ,
" community " ,
" a.hot_rank != 0 " ,
" SET hot_rank = r.hot_rank(a.subscribers, a.published) " ,
)
. await ? ;
info! ( " Finished hot ranks update! " ) ;
Ok ( ( ) )
2023-06-27 08:13:51 +00:00
}
2023-06-08 20:15:15 +00:00
2023-06-27 08:13:51 +00:00
#[ derive(QueryableByName) ]
struct HotRanksUpdateResult {
2023-08-24 15:27:00 +00:00
#[ diesel(sql_type = Timestamptz) ]
published : DateTime < Utc > ,
2023-06-27 08:13:51 +00:00
}
2023-06-08 20:15:15 +00:00
2023-07-17 09:05:55 +00:00
/// Runs the hot rank update query in batches until all rows have been processed.
/// In `where_clause` and `set_clause`, "a" will refer to the current aggregates table.
2023-06-27 08:13:51 +00:00
/// Locked rows are skipped in order to prevent deadlocks (they will likely get updated on the next
/// run)
2023-09-11 09:12:16 +00:00
async fn process_ranks_in_batches (
conn : & mut AsyncPgConnection ,
2023-06-27 08:13:51 +00:00
table_name : & str ,
2023-07-17 09:05:55 +00:00
where_clause : & str ,
2023-06-27 08:13:51 +00:00
set_clause : & str ,
2024-12-21 00:21:09 +00:00
) -> LemmyResult < ( ) > {
2024-11-15 13:18:52 +00:00
let process_start_time : DateTime < Utc > = Utc . timestamp_opt ( 0 , 0 ) . single ( ) . unwrap_or_default ( ) ;
2023-07-17 09:05:55 +00:00
2023-06-27 08:13:51 +00:00
let update_batch_size = 1000 ; // Bigger batches than this tend to cause seq scans
2023-07-17 09:05:55 +00:00
let mut processed_rows_count = 0 ;
2023-06-27 08:13:51 +00:00
let mut previous_batch_result = Some ( process_start_time ) ;
while let Some ( previous_batch_last_published ) = previous_batch_result {
// Raw `sql_query` is used as a performance optimization - Diesel does not support doing this
// in a single query (neither as a CTE, nor using a subquery)
2024-12-21 00:21:09 +00:00
let updated_rows = sql_query ( format! (
Remove id column and use different primary key on some tables (#4093)
* post_saved
* fmt
* remove unique and not null
* put person_id first in primary key and remove index
* use post_saved.find
* change captcha_answer
* remove removal of not null
* comment_aggregates
* comment_like
* comment_saved
* aggregates
* remove "\"
* deduplicate site_aggregates
* person_post_aggregates
* community_moderator
* community_block
* community_person_ban
* custom_emoji_keyword
* federation allow/block list
* federation_queue_state
* instance_block
* local_site_rate_limit, local_user_language, login_token
* person_ban, person_block, person_follower, post_like, post_read, received_activity
* community_follower, community_language, site_language
* fmt
* image_upload
* remove unused newtypes
* remove more indexes
* use .find
* merge
* fix site_aggregates_site function
* fmt
* Primary keys dess (#17)
* Also order reports by oldest first (ref #4123) (#4129)
* Support signed fetch for federation (fixes #868) (#4125)
* Support signed fetch for federation (fixes #868)
* taplo
* add federation queue state to get_federated_instances api (#4104)
* add federation queue state to get_federated_instances api
* feature gate
* move retry sleep function
* move stuff around
* Add UI setting for collapsing bot comments. Fixes #3838 (#4098)
* Add UI setting for collapsing bot comments. Fixes #3838
* Fixing clippy check.
* Only keep sent and received activities for 7 days (fixes #4113, fixes #4110) (#4131)
* Only check auth secure on release mode. (#4127)
* Only check auth secure on release mode.
* Fixing wrong js-client.
* Adding is_debug_mode var.
* Fixing the desktop image on the README. (#4135)
* Delete dupes and add possibly missing unique constraint on person_aggregates.
* Fixing clippy lints.
---------
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
* fmt
* Update community_block.rs
* Update instance_block.rs
* Update person_block.rs
* Update person_block.rs
---------
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
2023-11-13 13:14:07 +00:00
r #" WITH batch AS (SELECT a.{id_column}
2023-06-27 08:13:51 +00:00
FROM { aggregates_table } a
2023-07-17 09:05:55 +00:00
WHERE a . published > $ 1 AND ( { where_clause } )
2023-06-27 08:13:51 +00:00
ORDER BY a . published
LIMIT $ 2
FOR UPDATE SKIP LOCKED )
UPDATE { aggregates_table } a { set_clause }
Remove id column and use different primary key on some tables (#4093)
* post_saved
* fmt
* remove unique and not null
* put person_id first in primary key and remove index
* use post_saved.find
* change captcha_answer
* remove removal of not null
* comment_aggregates
* comment_like
* comment_saved
* aggregates
* remove "\"
* deduplicate site_aggregates
* person_post_aggregates
* community_moderator
* community_block
* community_person_ban
* custom_emoji_keyword
* federation allow/block list
* federation_queue_state
* instance_block
* local_site_rate_limit, local_user_language, login_token
* person_ban, person_block, person_follower, post_like, post_read, received_activity
* community_follower, community_language, site_language
* fmt
* image_upload
* remove unused newtypes
* remove more indexes
* use .find
* merge
* fix site_aggregates_site function
* fmt
* Primary keys dess (#17)
* Also order reports by oldest first (ref #4123) (#4129)
* Support signed fetch for federation (fixes #868) (#4125)
* Support signed fetch for federation (fixes #868)
* taplo
* add federation queue state to get_federated_instances api (#4104)
* add federation queue state to get_federated_instances api
* feature gate
* move retry sleep function
* move stuff around
* Add UI setting for collapsing bot comments. Fixes #3838 (#4098)
* Add UI setting for collapsing bot comments. Fixes #3838
* Fixing clippy check.
* Only keep sent and received activities for 7 days (fixes #4113, fixes #4110) (#4131)
* Only check auth secure on release mode. (#4127)
* Only check auth secure on release mode.
* Fixing wrong js-client.
* Adding is_debug_mode var.
* Fixing the desktop image on the README. (#4135)
* Delete dupes and add possibly missing unique constraint on person_aggregates.
* Fixing clippy lints.
---------
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
* fmt
* Update community_block.rs
* Update instance_block.rs
* Update person_block.rs
* Update person_block.rs
---------
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
2023-11-13 13:14:07 +00:00
FROM batch WHERE a . { id_column } = batch . { id_column } RETURNING a . published ;
2023-06-27 08:13:51 +00:00
" #,
2024-12-19 20:42:01 +00:00
id_column = format_args! ( " {table_name} _id " ) ,
aggregates_table = format_args! ( " {table_name} _aggregates " ) ,
2023-06-27 08:13:51 +00:00
) )
2023-08-24 15:27:00 +00:00
. bind ::< Timestamptz , _ > ( previous_batch_last_published )
2023-06-27 08:13:51 +00:00
. bind ::< Integer , _ > ( update_batch_size )
2023-09-11 09:12:16 +00:00
. get_results ::< HotRanksUpdateResult > ( conn )
2024-12-21 00:21:09 +00:00
. await
. map_err ( | e | {
LemmyErrorType ::Unknown ( format! ( " Failed to update {} hot_ranks: {} " , table_name , e ) )
} ) ? ;
processed_rows_count + = updated_rows . len ( ) ;
previous_batch_result = updated_rows . last ( ) . map ( | row | row . published ) ;
2023-06-15 09:29:12 +00:00
}
2023-06-27 08:13:51 +00:00
info! (
2023-07-17 09:05:55 +00:00
" Finished process_hot_ranks_in_batches execution for {} (processed {} rows) " ,
table_name , processed_rows_count
2023-06-27 08:13:51 +00:00
) ;
2024-12-21 00:21:09 +00:00
Ok ( ( ) )
2021-01-29 16:38:27 +00:00
}
2023-09-06 17:43:27 +00:00
/// Post aggregates is a special case, since it needs to join to the community_aggregates
/// table, to get the active monthly user counts.
2024-12-21 00:21:09 +00:00
async fn process_post_aggregates_ranks_in_batches ( conn : & mut AsyncPgConnection ) -> LemmyResult < ( ) > {
2024-11-15 13:18:52 +00:00
let process_start_time : DateTime < Utc > = Utc . timestamp_opt ( 0 , 0 ) . single ( ) . unwrap_or_default ( ) ;
2023-09-06 17:43:27 +00:00
let update_batch_size = 1000 ; // Bigger batches than this tend to cause seq scans
let mut processed_rows_count = 0 ;
let mut previous_batch_result = Some ( process_start_time ) ;
while let Some ( previous_batch_last_published ) = previous_batch_result {
2024-12-21 00:21:09 +00:00
let updated_rows = sql_query (
Remove id column and use different primary key on some tables (#4093)
* post_saved
* fmt
* remove unique and not null
* put person_id first in primary key and remove index
* use post_saved.find
* change captcha_answer
* remove removal of not null
* comment_aggregates
* comment_like
* comment_saved
* aggregates
* remove "\"
* deduplicate site_aggregates
* person_post_aggregates
* community_moderator
* community_block
* community_person_ban
* custom_emoji_keyword
* federation allow/block list
* federation_queue_state
* instance_block
* local_site_rate_limit, local_user_language, login_token
* person_ban, person_block, person_follower, post_like, post_read, received_activity
* community_follower, community_language, site_language
* fmt
* image_upload
* remove unused newtypes
* remove more indexes
* use .find
* merge
* fix site_aggregates_site function
* fmt
* Primary keys dess (#17)
* Also order reports by oldest first (ref #4123) (#4129)
* Support signed fetch for federation (fixes #868) (#4125)
* Support signed fetch for federation (fixes #868)
* taplo
* add federation queue state to get_federated_instances api (#4104)
* add federation queue state to get_federated_instances api
* feature gate
* move retry sleep function
* move stuff around
* Add UI setting for collapsing bot comments. Fixes #3838 (#4098)
* Add UI setting for collapsing bot comments. Fixes #3838
* Fixing clippy check.
* Only keep sent and received activities for 7 days (fixes #4113, fixes #4110) (#4131)
* Only check auth secure on release mode. (#4127)
* Only check auth secure on release mode.
* Fixing wrong js-client.
* Adding is_debug_mode var.
* Fixing the desktop image on the README. (#4135)
* Delete dupes and add possibly missing unique constraint on person_aggregates.
* Fixing clippy lints.
---------
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
* fmt
* Update community_block.rs
* Update instance_block.rs
* Update person_block.rs
* Update person_block.rs
---------
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
2023-11-13 13:14:07 +00:00
r #" WITH batch AS (SELECT pa.post_id
2023-09-06 17:43:27 +00:00
FROM post_aggregates pa
WHERE pa . published > $ 1
AND ( pa . hot_rank ! = 0 OR pa . hot_rank_active ! = 0 )
ORDER BY pa . published
LIMIT $ 2
FOR UPDATE SKIP LOCKED )
UPDATE post_aggregates pa
2024-04-18 00:58:44 +00:00
SET hot_rank = r . hot_rank ( pa . score , pa . published ) ,
hot_rank_active = r . hot_rank ( pa . score , pa . newest_comment_time_necro ) ,
scaled_rank = r . scaled_rank ( pa . score , pa . published , ca . users_active_month )
2023-09-06 17:43:27 +00:00
FROM batch , community_aggregates ca
Remove id column and use different primary key on some tables (#4093)
* post_saved
* fmt
* remove unique and not null
* put person_id first in primary key and remove index
* use post_saved.find
* change captcha_answer
* remove removal of not null
* comment_aggregates
* comment_like
* comment_saved
* aggregates
* remove "\"
* deduplicate site_aggregates
* person_post_aggregates
* community_moderator
* community_block
* community_person_ban
* custom_emoji_keyword
* federation allow/block list
* federation_queue_state
* instance_block
* local_site_rate_limit, local_user_language, login_token
* person_ban, person_block, person_follower, post_like, post_read, received_activity
* community_follower, community_language, site_language
* fmt
* image_upload
* remove unused newtypes
* remove more indexes
* use .find
* merge
* fix site_aggregates_site function
* fmt
* Primary keys dess (#17)
* Also order reports by oldest first (ref #4123) (#4129)
* Support signed fetch for federation (fixes #868) (#4125)
* Support signed fetch for federation (fixes #868)
* taplo
* add federation queue state to get_federated_instances api (#4104)
* add federation queue state to get_federated_instances api
* feature gate
* move retry sleep function
* move stuff around
* Add UI setting for collapsing bot comments. Fixes #3838 (#4098)
* Add UI setting for collapsing bot comments. Fixes #3838
* Fixing clippy check.
* Only keep sent and received activities for 7 days (fixes #4113, fixes #4110) (#4131)
* Only check auth secure on release mode. (#4127)
* Only check auth secure on release mode.
* Fixing wrong js-client.
* Adding is_debug_mode var.
* Fixing the desktop image on the README. (#4135)
* Delete dupes and add possibly missing unique constraint on person_aggregates.
* Fixing clippy lints.
---------
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
* fmt
* Update community_block.rs
* Update instance_block.rs
* Update person_block.rs
* Update person_block.rs
---------
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: phiresky <phireskyde+git@gmail.com>
2023-11-13 13:14:07 +00:00
WHERE pa . post_id = batch . post_id and pa . community_id = ca . community_id RETURNING pa . published ;
2023-09-06 17:43:27 +00:00
" #,
)
. bind ::< Timestamptz , _ > ( previous_batch_last_published )
. bind ::< Integer , _ > ( update_batch_size )
2023-09-11 09:12:16 +00:00
. get_results ::< HotRanksUpdateResult > ( conn )
2024-12-21 00:21:09 +00:00
. await . map_err ( | e | LemmyErrorType ::Unknown ( format! ( " Failed to update {} hot_ranks: {} " , " post_aggregates " , e ) ) ) ? ;
processed_rows_count + = updated_rows . len ( ) ;
previous_batch_result = updated_rows . last ( ) . map ( | row | row . published ) ;
2023-09-06 17:43:27 +00:00
}
info! (
" Finished process_hot_ranks_in_batches execution for {} (processed {} rows) " ,
" post_aggregates " , processed_rows_count
) ;
2024-12-21 00:21:09 +00:00
Ok ( ( ) )
2023-09-06 17:43:27 +00:00
}
2024-12-21 00:21:09 +00:00
async fn delete_expired_captcha_answers ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
let mut conn = get_conn ( pool ) . await ? ;
2023-09-11 09:12:16 +00:00
2024-12-21 00:21:09 +00:00
diesel ::delete (
captcha_answer ::table . filter ( captcha_answer ::published . lt ( now ( ) - IntervalDsl ::minutes ( 10 ) ) ) ,
)
. execute ( & mut conn )
. await ? ;
info! ( " Done. " ) ;
Ok ( ( ) )
2023-06-27 10:38:53 +00:00
}
2021-01-29 16:38:27 +00:00
/// Clear old activities (this table gets very large)
2024-12-21 00:21:09 +00:00
async fn clear_old_activities ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2021-01-29 16:38:27 +00:00
info! ( " Clearing old activities... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
diesel ::delete (
sent_activity ::table . filter ( sent_activity ::published . lt ( now ( ) - IntervalDsl ::days ( 7 ) ) ) ,
)
. execute ( & mut conn )
. await ? ;
diesel ::delete (
received_activity ::table . filter ( received_activity ::published . lt ( now ( ) - IntervalDsl ::days ( 7 ) ) ) ,
)
. execute ( & mut conn )
. await ? ;
info! ( " Done. " ) ;
Ok ( ( ) )
2021-01-29 16:38:27 +00:00
}
2024-12-21 00:21:09 +00:00
async fn delete_old_denied_users ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
LocalUser ::delete_old_denied_local_users ( pool ) . await ? ;
info! ( " Done. " ) ;
Ok ( ( ) )
2024-02-15 12:50:53 +00:00
}
2023-06-20 06:17:54 +00:00
/// overwrite posts and comments 30d after deletion
2024-12-21 00:21:09 +00:00
async fn overwrite_deleted_posts_and_comments ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2023-06-20 06:17:54 +00:00
info! ( " Overwriting deleted posts... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
diesel ::update (
post ::table
. filter ( post ::deleted . eq ( true ) )
. filter ( post ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( post ::body . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( (
post ::body . eq ( DELETED_REPLACEMENT_TEXT ) ,
post ::name . eq ( DELETED_REPLACEMENT_TEXT ) ,
) )
. execute ( & mut conn )
. await ? ;
info! ( " Overwriting deleted comments... " ) ;
diesel ::update (
comment ::table
. filter ( comment ::deleted . eq ( true ) )
. filter ( comment ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( comment ::content . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( comment ::content . eq ( DELETED_REPLACEMENT_TEXT ) )
. execute ( & mut conn )
. await ? ;
info! ( " Done. " ) ;
Ok ( ( ) )
2023-06-20 06:17:54 +00:00
}
2021-01-29 16:38:27 +00:00
/// Re-calculate the site and community active counts every 12 hours
2024-12-21 00:21:09 +00:00
async fn active_counts ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2021-01-29 16:38:27 +00:00
info! ( " Updating active site and community aggregates ... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
2021-01-29 16:38:27 +00:00
2024-12-21 00:21:09 +00:00
let intervals = vec! [
( " 1 day " , " day " ) ,
( " 1 week " , " week " ) ,
( " 1 month " , " month " ) ,
( " 6 months " , " half_year " ) ,
] ;
2023-09-11 09:12:16 +00:00
2024-12-21 00:21:09 +00:00
for ( full_form , abbr ) in & intervals {
let update_site_stmt = format! (
2024-11-12 17:03:30 +00:00
" update site_aggregates set users_active_{} = (select * from r.site_aggregates_activity('{}')) where site_id = 1 " ,
2024-10-31 12:12:24 +00:00
abbr , full_form
2021-01-29 16:38:27 +00:00
) ;
2024-12-21 00:21:09 +00:00
sql_query ( update_site_stmt ) . execute ( & mut conn ) . await ? ;
let update_community_stmt = format! ( " update community_aggregates ca set users_active_ {} = mv.count_ from r.community_aggregates_activity(' {} ') mv where ca.community_id = mv.community_id_ " , abbr , full_form ) ;
sql_query ( update_community_stmt ) . execute ( & mut conn ) . await ? ;
2023-09-11 09:12:16 +00:00
}
2024-12-21 00:21:09 +00:00
info! ( " Done. " ) ;
Ok ( ( ) )
2021-01-29 16:38:27 +00:00
}
2022-03-30 13:56:23 +00:00
/// Set banned to false after ban expires
2024-12-21 00:21:09 +00:00
async fn update_banned_when_expired ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2022-03-30 13:56:23 +00:00
info! ( " Updating banned column if it expires ... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
diesel ::update (
person ::table
. filter ( person ::banned . eq ( true ) )
. filter ( person ::ban_expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. set ( person ::banned . eq ( false ) )
. execute ( & mut conn )
. await ? ;
2025-01-09 14:35:33 +00:00
uplete ::new ( community_actions ::table . filter ( community_actions ::ban_expires . lt ( now ( ) . nullable ( ) ) ) )
. set_null ( community_actions ::received_ban )
. set_null ( community_actions ::ban_expires )
. as_query ( )
. execute ( & mut conn )
. await ? ;
2024-12-21 00:21:09 +00:00
Ok ( ( ) )
2024-11-28 23:21:43 +00:00
}
/// Set banned to false after ban expires
2024-12-21 00:21:09 +00:00
async fn delete_instance_block_when_expired ( pool : & mut DbPool < '_ > ) -> LemmyResult < ( ) > {
2024-11-28 23:21:43 +00:00
info! ( " Delete instance blocks when expired ... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
diesel ::delete (
federation_blocklist ::table . filter ( federation_blocklist ::expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. execute ( & mut conn )
. await ? ;
Ok ( ( ) )
2024-09-24 09:39:40 +00:00
}
/// Find all unpublished posts with scheduled date in the future, and publish them.
2024-12-21 00:21:09 +00:00
async fn publish_scheduled_posts ( context : & Data < LemmyContext > ) -> LemmyResult < ( ) > {
2024-09-24 09:39:40 +00:00
let pool = & mut context . pool ( ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
let scheduled_posts : Vec < _ > = post ::table
. inner_join ( community ::table )
. inner_join ( person ::table )
// find all posts which have scheduled_publish_time that is in the past
. filter ( post ::scheduled_publish_time . is_not_null ( ) )
. filter ( coalesce ( post ::scheduled_publish_time , now ( ) ) . lt ( now ( ) ) )
// make sure the post, person and community are still around
. filter ( not ( post ::deleted . or ( post ::removed ) ) )
. filter ( not ( person ::banned . or ( person ::deleted ) ) )
. filter ( not ( community ::removed . or ( community ::deleted ) ) )
// ensure that user isnt banned from community
. filter ( not ( exists ( find_action (
community_actions ::received_ban ,
( person ::id , community ::id ) ,
) ) ) )
. select ( ( post ::all_columns , community ::all_columns ) )
. get_results ::< ( Post , Community ) > ( & mut conn )
. await ? ;
for ( post , community ) in scheduled_posts {
// mark post as published in db
let form = PostUpdateForm {
scheduled_publish_time : Some ( None ) ,
.. Default ::default ( )
} ;
Post ::update ( & mut context . pool ( ) , post . id , & form ) . await ? ;
// send out post via federation and webmention
let send_activity = SendActivityData ::CreatePost ( post . clone ( ) ) ;
ActivityChannel ::submit_activity ( send_activity , context ) ? ;
send_webmention ( post , community ) ;
2023-09-11 09:12:16 +00:00
}
2024-12-21 00:21:09 +00:00
Ok ( ( ) )
2022-03-30 13:56:23 +00:00
}
2022-09-07 12:12:51 +00:00
2024-06-03 21:30:00 +00:00
/// Updates the instance software and version.
///
/// Does so using the /.well-known/nodeinfo protocol described here:
/// https://github.com/jhass/nodeinfo/blob/main/PROTOCOL.md
2023-07-13 14:12:01 +00:00
///
/// TODO: if instance has been dead for a long time, it should be checked less frequently
2023-09-11 09:12:16 +00:00
async fn update_instance_software (
pool : & mut DbPool < '_ > ,
client : & ClientWithMiddleware ,
) -> LemmyResult < ( ) > {
2023-02-18 14:36:12 +00:00
info! ( " Updating instances software and versions... " ) ;
2024-12-21 00:21:09 +00:00
let mut conn = get_conn ( pool ) . await ? ;
let instances = instance ::table . get_results ::< Instance > ( & mut conn ) . await ? ;
for instance in instances {
if let Some ( form ) = build_update_instance_form ( & instance . domain , client ) . await {
Instance ::update ( pool , instance . id , form ) . await ? ;
2023-02-18 14:36:12 +00:00
}
}
2024-12-21 00:21:09 +00:00
info! ( " Finished updating instances software and versions... " ) ;
2023-07-13 14:12:01 +00:00
Ok ( ( ) )
2023-02-18 14:36:12 +00:00
}
2024-06-03 21:30:00 +00:00
/// This builds an instance update form, for a given domain.
/// If the instance sends a response, but doesn't have a well-known or nodeinfo,
/// Then return a default form with only the updated field.
async fn build_update_instance_form (
domain : & str ,
client : & ClientWithMiddleware ,
) -> Option < InstanceForm > {
// The `updated` column is used to check if instances are alive. If it is more than three
// days in the past, no outgoing activities will be sent to that instance. However
// not every Fediverse instance has a valid Nodeinfo endpoint (its not required for
// Activitypub). That's why we always need to mark instances as updated if they are
// alive.
2024-09-23 10:05:18 +00:00
let mut instance_form = InstanceForm {
2024-11-15 10:21:08 +00:00
updated : Some ( Utc ::now ( ) ) ,
2024-09-23 10:05:18 +00:00
.. InstanceForm ::new ( domain . to_string ( ) )
} ;
2024-06-03 21:30:00 +00:00
// First, fetch their /.well-known/nodeinfo, then extract the correct nodeinfo link from it
let well_known_url = format! ( " https:// {} /.well-known/nodeinfo " , domain ) ;
2024-06-04 12:04:16 +00:00
let Ok ( res ) = client . get ( & well_known_url ) . send ( ) . await else {
// This is the only kind of error that means the instance is dead
return None ;
} ;
2024-12-12 14:38:16 +00:00
let status = res . status ( ) ;
if status . is_client_error ( ) | | status . is_server_error ( ) {
return None ;
}
2024-06-04 12:04:16 +00:00
// In this block, returning `None` is ignored, and only means not writing nodeinfo to db
async {
let node_info_url = res
. json ::< NodeInfoWellKnown > ( )
. await
. ok ( ) ?
. links
. into_iter ( )
. find ( | links | {
links
. rel
. as_str ( )
. starts_with ( " http://nodeinfo.diaspora.software/ns/schema/2. " )
} ) ?
. href ;
let software = client
. get ( node_info_url )
. send ( )
. await
. ok ( ) ?
. json ::< NodeInfo > ( )
. await
. ok ( ) ?
. software ? ;
instance_form . software = software . name ;
instance_form . version = software . version ;
Some ( ( ) )
2024-06-03 21:30:00 +00:00
}
2024-06-04 12:04:16 +00:00
. await ;
Some ( instance_form )
2024-06-03 21:30:00 +00:00
}
2024-06-04 12:04:16 +00:00
2023-02-18 14:36:12 +00:00
#[ cfg(test) ]
mod tests {
2023-07-17 15:04:14 +00:00
2024-12-21 00:21:09 +00:00
use super ::* ;
use crate ::{ scheduled_tasks ::build_update_instance_form , tests ::test_context } ;
2024-06-03 21:30:00 +00:00
use lemmy_api_common ::request ::client_builder ;
2024-11-04 09:44:58 +00:00
use lemmy_utils ::{
error ::{ LemmyErrorType , LemmyResult } ,
settings ::structs ::Settings ,
} ;
2024-01-04 09:47:18 +00:00
use pretty_assertions ::assert_eq ;
2024-06-03 21:30:00 +00:00
use reqwest_middleware ::ClientBuilder ;
use serial_test ::serial ;
2023-02-18 14:36:12 +00:00
#[ tokio::test ]
2024-07-07 23:01:03 +00:00
async fn test_nodeinfo_lemmy_ml ( ) -> LemmyResult < ( ) > {
2024-06-03 21:30:00 +00:00
let client = ClientBuilder ::new ( client_builder ( & Settings ::default ( ) ) . build ( ) ? ) . build ( ) ;
2024-07-07 23:01:03 +00:00
let form = build_update_instance_form ( " lemmy.ml " , & client )
2023-02-18 14:36:12 +00:00
. await
2024-09-23 15:26:50 +00:00
. ok_or ( LemmyErrorType ::NotFound ) ? ;
assert_eq! ( form . software . ok_or ( LemmyErrorType ::NotFound ) ? , " lemmy " ) ;
2024-06-03 21:30:00 +00:00
Ok ( ( ) )
}
2023-02-18 14:36:12 +00:00
2024-06-03 21:30:00 +00:00
#[ tokio::test ]
async fn test_nodeinfo_mastodon_social ( ) -> LemmyResult < ( ) > {
let client = ClientBuilder ::new ( client_builder ( & Settings ::default ( ) ) . build ( ) ? ) . build ( ) ;
let form = build_update_instance_form ( " mastodon.social " , & client )
. await
2024-09-23 15:26:50 +00:00
. ok_or ( LemmyErrorType ::NotFound ) ? ;
assert_eq! ( form . software . ok_or ( LemmyErrorType ::NotFound ) ? , " mastodon " ) ;
2024-06-03 21:30:00 +00:00
Ok ( ( ) )
2023-02-18 14:36:12 +00:00
}
2024-12-21 00:21:09 +00:00
#[ tokio::test ]
#[ serial ]
async fn test_scheduled_tasks_no_errors ( ) -> LemmyResult < ( ) > {
let context = test_context ( ) . await ;
startup_jobs ( & mut context . pool ( ) ) . await ? ;
update_instance_software ( & mut context . pool ( ) , context . client ( ) ) . await ? ;
delete_expired_captcha_answers ( & mut context . pool ( ) ) . await ? ;
publish_scheduled_posts ( & context ) . await ? ;
Ok ( ( ) )
}
2023-02-18 14:36:12 +00:00
}