diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs new file mode 100644 index 000000000..d50a5d044 --- /dev/null +++ b/crates/federate/src/inboxes.rs @@ -0,0 +1,150 @@ +use crate::util::LEMMY_TEST_FAST_FEDERATION; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::{ + newtypes::CommunityId, + source::{activity::SentActivity, instance::Instance, site::Site}, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use lemmy_utils::error::LemmyResult; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::collections::{HashMap, HashSet}; + +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: +/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits +/// the maximum time until the follow actually results in activities from that community id being +/// sent to that inbox url. This delay currently needs to not be too small because the DB load is +/// currently fairly high because of the current structure of storing inboxes for every person, not +/// having a separate list of shared_inboxes, and the architecture of having every instance queue be +/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") + } else { + chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + } +}); + +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance +/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it +/// in a timely manner is not too important. +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +pub(crate) struct CommunityInboxCollector { + instance: Instance, + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + // TODO: use lazy + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + last_communities_fetch_full: DateTime, + last_communities_fetch_incr: DateTime, +} + +impl CommunityInboxCollector { + pub fn new(instance: Instance) -> Self { + Self { + instance, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + last_communities_fetch_full: Utc.timestamp_nanos(0), + last_communities_fetch_incr: Utc.timestamp_nanos(0), + } + } + + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + pub async fn get_inbox_urls( + &mut self, + activity: &SentActivity, + context: &LemmyContext, + ) -> LemmyResult> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = Site::read_from_instance_id(&mut context.pool(), self.instance.id).await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these + // activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().cloned()); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + .filter(|&u| (u.domain() == Some(&self.instance.domain))) + .map(|u| u.inner().clone()), + ); + Ok(inbox_urls) + } + + pub async fn update_communities(&mut self, context: &LemmyContext) -> LemmyResult<()> { + // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if + // published date is not exact + let updated_fetch = + Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); + + let full_fetch = Utc::now() - self.last_communities_fetch_full; + if full_fetch > *FOLLOW_REMOVALS_RECHECK_DELAY { + // process removals every hour + self.followed_communities = self + .get_communities(Utc.timestamp_nanos(0), context) + .await?; + self.last_communities_fetch_full = updated_fetch; + self.last_communities_fetch_incr = self.last_communities_fetch_full; + } + let incr_fetch = Utc::now() - self.last_communities_fetch_incr; + if incr_fetch > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let added = self + .get_communities(self.last_communities_fetch_incr, context) + .await?; + self.followed_communities.extend(added); + self.last_communities_fetch_incr = updated_fetch; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about + /// them + async fn get_communities( + &mut self, + last_fetch: DateTime, + context: &LemmyContext, + ) -> LemmyResult>> { + let followed = CommunityFollowerView::get_instance_followed_community_inboxes( + &mut context.pool(), + self.instance.id, + last_fetch, + ) + .await?; + Ok( + followed + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), + ) + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index d3876226f..bc54bb05c 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -16,6 +16,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::info; +mod inboxes; mod stats; mod util; mod worker; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index f13a02678..6b7100954 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,9 +1,11 @@ -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, +use crate::{ + inboxes::CommunityInboxCollector, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{ activity_sending::SendActivityTask, @@ -15,20 +17,16 @@ use chrono::{DateTime, Days, TimeZone, Utc}; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{ - newtypes::{ActivityId, CommunityId, InstanceId}, + newtypes::{ActivityId, InstanceId}, source::{ activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, - site::Site, }, utils::naive_now, }; -use lemmy_db_views_actor::structs::CommunityFollowerView; -use once_cell::sync::Lazy; -use reqwest::Url; +use lemmy_utils::error::LemmyResult; use std::{ - collections::{HashMap, HashSet}, ops::{Add, Deref}, time::Duration, }; @@ -43,41 +41,13 @@ static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is /// SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// interval with which new additions to community_followers are queried. -/// -/// The first time some user on an instance follows a specific remote community (or, more precisely: -/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits -/// the maximum time until the follow actually results in activities from that community id being -/// sent to that inbox url. This delay currently needs to not be too small because the DB load is -/// currently fairly high because of the current structure of storing inboxes for every person, not -/// having a separate list of shared_inboxes, and the architecture of having every instance queue be -/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") - } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") - } -}); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance -/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it -/// in a timely manner is not too important. -static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + pub(crate) struct InstanceWorker { instance: Instance, - // load site lazily because if an instance is first seen due to being on allowlist, - // the corresponding row in `site` may not exist yet since that is only added once - // `fetch_instance_actor_for_object` is called. - // (this should be unlikely to be relevant outside of the federation tests) - site_loaded: bool, - site: Option, - followed_communities: HashMap>, + inboxes: CommunityInboxCollector, stop: CancellationToken, context: Data, stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, } @@ -88,19 +58,16 @@ impl InstanceWorker { context: Data, stop: CancellationToken, stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, - ) -> Result<(), anyhow::Error> { + ) -> LemmyResult<()> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; + let inboxes = CommunityInboxCollector::new(instance.clone()); let mut worker = InstanceWorker { instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), + inboxes, stop, context, stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), }; @@ -109,11 +76,11 @@ impl InstanceWorker { /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// cancelled (graceful exit) - pub(crate) async fn loop_until_stopped(&mut self) -> Result<(), anyhow::Error> { + pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> { debug!("Starting federation worker for {}", self.instance.domain); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - self.update_communities().await?; + self.inboxes.update_communities(&self.context).await?; self.initial_fail_sleep().await?; while !self.stop.is_cancelled() { self.loop_batch().await?; @@ -123,7 +90,7 @@ impl InstanceWorker { if (Utc::now() - self.last_state_insert) > save_state_every { self.save_and_send_state().await?; } - self.update_communities().await?; + self.inboxes.update_communities(&self.context).await?; } // final update of state in db self.save_and_send_state().await?; @@ -211,11 +178,8 @@ impl InstanceWorker { &mut self, activity: &SentActivity, object: &SharedInboxActivities, - ) -> Result<()> { - let inbox_urls = self - .get_inbox_urls(activity) - .await - .context("failed figuring out inbox urls")?; + ) -> LemmyResult<()> { + let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?; if inbox_urls.is_empty() { trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); self.state.last_successful_id = Some(activity.id); @@ -269,84 +233,6 @@ impl InstanceWorker { Ok(()) } - /// get inbox urls of sending the given activity to the given instance - /// most often this will return 0 values (if instance doesn't care about the activity) - /// or 1 value (the shared inbox) - /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { - let mut inbox_urls: HashSet = HashSet::new(); - - if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(&mut self.context.pool(), self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these - // activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); - } - } - if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { - inbox_urls.extend(urls.iter().cloned()); - } - } - inbox_urls.extend( - activity - .send_inboxes - .iter() - .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) - .map(|u| u.inner().clone()), - ); - Ok(inbox_urls) - } - - async fn update_communities(&mut self) -> Result<()> { - if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { - // process removals every hour - (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(self.instance.id, Utc.timestamp_nanos(0)) - .await?; - self.last_incremental_communities_fetch = self.last_full_communities_fetch; - } - if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { - // process additions every minute - let (news, time) = self - .get_communities(self.instance.id, self.last_incremental_communities_fetch) - .await?; - self.followed_communities.extend(news); - self.last_incremental_communities_fetch = time; - } - Ok(()) - } - - /// get a list of local communities with the remote inboxes on the given instance that cares about - /// them - async fn get_communities( - &mut self, - instance_id: InstanceId, - last_fetch: DateTime, - ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if - // published date is not exact - Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes( - &mut self.context.pool(), - instance_id, - last_fetch, - ) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), - new_last_fetch, - )) - } async fn save_and_send_state(&mut self) -> Result<()> { self.last_state_insert = Utc::now(); FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?;