diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index 3042fd344..aca57bc5f 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,6 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 3bc6218bb..b52e4dbbf 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -22,20 +22,47 @@ use std::{ }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) + +/// Decrease the delays of the federation queue. +/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. +static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_TEST_FAST_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(false) +}); + +/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) +/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; +/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// recheck for new federation work every n seconds -#[cfg(debug_assertions)] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); -#[cfg(not(debug_assertions))] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); -#[cfg(debug_assertions)] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(1)); -#[cfg(not(debug_assertions))] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(5)); +/// Recheck for new federation work every n seconds. +/// +/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, +/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. +/// This delay is only applied if no federated activity happens during sending activities of the last batch. +static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + Duration::from_secs(1) + } else { + Duration::from_secs(30) + } +}); +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::Duration::seconds(1) + } else { + chrono::Duration::minutes(2) + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::Duration::hours(1)); pub(crate) struct InstanceWorker { @@ -121,6 +148,7 @@ impl InstanceWorker { } Ok(()) } + /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; if self.state.last_successful_id == -1 { @@ -134,7 +162,7 @@ impl InstanceWorker { if id == latest_id { // no more work to be done, wait before rechecking tokio::select! { - () = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, () = self.stop.cancelled() => {} } return Ok(());