diff --git a/.woodpecker.yml b/.woodpecker.yml index b77fd6615..00798f081 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -181,6 +181,7 @@ steps: LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy RUST_BACKTRACE: "1" CARGO_HOME: .cargo_home + LEMMY_TEST_FAST_FEDERATION: "1" commands: - export LEMMY_CONFIG_LOCATION=../../config/config.hjson - cargo test --workspace --no-fail-fast diff --git a/Cargo.lock b/Cargo.lock index a14147e09..2b44775af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1683,6 +1683,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "downcast-rs" version = "1.2.1" @@ -1924,6 +1930,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs2" version = "0.4.3" @@ -3113,7 +3125,9 @@ name = "lemmy_federate" version = "0.19.5" dependencies = [ "activitypub_federation", + "actix-web", "anyhow", + "async-trait", "chrono", "diesel", "diesel-async", @@ -3123,14 +3137,19 @@ dependencies = [ "lemmy_db_schema", "lemmy_db_views_actor", "lemmy_utils", + "mockall", "moka", "once_cell", "reqwest 0.11.27", "serde_json", "serial_test", + "test-context", "tokio", "tokio-util", "tracing", + "tracing-test", + "url", + "uuid", ] [[package]] @@ -3608,6 +3627,33 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" +[[package]] +name = "mockall" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "moka" version = "0.12.7" @@ -4356,6 +4402,32 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "predicates" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_assertions" version = "1.4.0" @@ -5713,6 +5785,33 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + +[[package]] +name = "test-context" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9" +dependencies = [ + "futures", + "test-context-macros", +] + +[[package]] +name = "test-context-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "thiserror" version = "1.0.61" @@ -6336,6 +6435,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.66", +] + [[package]] name = "triomphe" version = "0.1.12" diff --git a/api_tests/src/follow.spec.ts b/api_tests/src/follow.spec.ts index 161c7f045..22fdfa305 100644 --- a/api_tests/src/follow.spec.ts +++ b/api_tests/src/follow.spec.ts @@ -11,6 +11,7 @@ import { betaUrl, registerUser, unfollows, + delay, } from "./shared"; beforeAll(setupLogins); @@ -21,39 +22,48 @@ test("Follow local community", async () => { let user = await registerUser(beta, betaUrl); let community = (await resolveBetaCommunity(user)).community!; - expect(community.counts.subscribers).toBe(1); - expect(community.counts.subscribers_local).toBe(1); let follow = await followCommunity(user, true, community.community.id); // Make sure the follow response went through expect(follow.community_view.community.local).toBe(true); expect(follow.community_view.subscribed).toBe("Subscribed"); - expect(follow.community_view.counts.subscribers).toBe(2); - expect(follow.community_view.counts.subscribers_local).toBe(2); + expect(follow.community_view.counts.subscribers).toBe( + community.counts.subscribers + 1, + ); + expect(follow.community_view.counts.subscribers_local).toBe( + community.counts.subscribers_local + 1, + ); // Test an unfollow let unfollow = await followCommunity(user, false, community.community.id); expect(unfollow.community_view.subscribed).toBe("NotSubscribed"); - expect(unfollow.community_view.counts.subscribers).toBe(1); - expect(unfollow.community_view.counts.subscribers_local).toBe(1); + expect(unfollow.community_view.counts.subscribers).toBe( + community.counts.subscribers, + ); + expect(unfollow.community_view.counts.subscribers_local).toBe( + community.counts.subscribers_local, + ); }); test("Follow federated community", async () => { // It takes about 1 second for the community aggregates to federate - let betaCommunity = ( + await delay(2000); // if this is the second test run, we don't have a way to wait for the correct number of subscribers + const betaCommunityInitial = ( await waitUntil( () => resolveBetaCommunity(alpha), - c => - c.community?.counts.subscribers === 1 && - c.community.counts.subscribers_local === 0, + c => !!c.community && c.community?.counts.subscribers >= 1, ) ).community; - if (!betaCommunity) { + if (!betaCommunityInitial) { throw "Missing beta community"; } - let follow = await followCommunity(alpha, true, betaCommunity.community.id); + let follow = await followCommunity( + alpha, + true, + betaCommunityInitial.community.id, + ); expect(follow.community_view.subscribed).toBe("Pending"); - betaCommunity = ( + const betaCommunity = ( await waitUntil( () => resolveBetaCommunity(alpha), c => c.community?.subscribed === "Subscribed", @@ -64,20 +74,24 @@ test("Follow federated community", async () => { expect(betaCommunity?.community.local).toBe(false); expect(betaCommunity?.community.name).toBe("main"); expect(betaCommunity?.subscribed).toBe("Subscribed"); - expect(betaCommunity?.counts.subscribers_local).toBe(1); + expect(betaCommunity?.counts.subscribers_local).toBe( + betaCommunityInitial.counts.subscribers_local + 1, + ); // check that unfollow was federated let communityOnBeta1 = await resolveBetaCommunity(beta); - expect(communityOnBeta1.community?.counts.subscribers).toBe(2); - expect(communityOnBeta1.community?.counts.subscribers_local).toBe(1); + expect(communityOnBeta1.community?.counts.subscribers).toBe( + betaCommunityInitial.counts.subscribers + 1, + ); // Check it from local let site = await getSite(alpha); let remoteCommunityId = site.my_user?.follows.find( - c => c.community.local == false, + c => + c.community.local == false && + c.community.id === betaCommunityInitial.community.id, )?.community.id; expect(remoteCommunityId).toBeDefined(); - expect(site.my_user?.follows.length).toBe(2); if (!remoteCommunityId) { throw "Missing remote community id"; @@ -89,10 +103,21 @@ test("Follow federated community", async () => { // Make sure you are unsubbed locally let siteUnfollowCheck = await getSite(alpha); - expect(siteUnfollowCheck.my_user?.follows.length).toBe(1); + expect( + siteUnfollowCheck.my_user?.follows.find( + c => c.community.id === betaCommunityInitial.community.id, + ), + ).toBe(undefined); // check that unfollow was federated - let communityOnBeta2 = await resolveBetaCommunity(beta); - expect(communityOnBeta2.community?.counts.subscribers).toBe(1); + let communityOnBeta2 = await waitUntil( + () => resolveBetaCommunity(beta), + c => + c.community?.counts.subscribers === + betaCommunityInitial.counts.subscribers, + ); + expect(communityOnBeta2.community?.counts.subscribers).toBe( + betaCommunityInitial.counts.subscribers, + ); expect(communityOnBeta2.community?.counts.subscribers_local).toBe(1); }); diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index cb45274c6..75df80775 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -52,17 +52,23 @@ beforeAll(async () => { afterAll(unfollows); -async function assertPostFederation(postOne: PostView, postTwo: PostView) { +async function assertPostFederation( + postOne: PostView, + postTwo: PostView, + waitForMeta = true, +) { // Link metadata is generated in background task and may not be ready yet at this time, // so wait for it explicitly. For removed posts we cant refetch anything. - postOne = await waitForPost(beta, postOne.post, res => { - return res === null || res?.post.embed_title !== null; - }); - postTwo = await waitForPost( - beta, - postTwo.post, - res => res === null || res?.post.embed_title !== null, - ); + if (waitForMeta) { + postOne = await waitForPost(beta, postOne.post, res => { + return res === null || !!res?.post.embed_title; + }); + postTwo = await waitForPost( + beta, + postTwo.post, + res => res === null || !!res?.post.embed_title, + ); + } expect(postOne?.post.ap_id).toBe(postTwo?.post.ap_id); expect(postOne?.post.name).toBe(postTwo?.post.name); @@ -408,7 +414,11 @@ test("Remove a post from admin and community on same instance", async () => { p => p?.post_view.post.removed ?? false, ); expect(alphaPost?.post_view.post.removed).toBe(true); - await assertPostFederation(alphaPost.post_view, removePostRes.post_view); + await assertPostFederation( + alphaPost.post_view, + removePostRes.post_view, + false, + ); // Undelete let undeletedPost = await removePost(beta, false, betaPost.post); diff --git a/api_tests/src/user.spec.ts b/api_tests/src/user.spec.ts index d008dcdc3..2edcf54ea 100644 --- a/api_tests/src/user.spec.ts +++ b/api_tests/src/user.spec.ts @@ -131,7 +131,11 @@ test("Requests with invalid auth should be treated as unauthenticated", async () }); test("Create user with Arabic name", async () => { - let user = await registerUser(alpha, alphaUrl, "تجريب"); + let user = await registerUser( + alpha, + alphaUrl, + "تجريب" + Math.random().toString().slice(2, 10), // less than actor_name_max_length + ); let site = await getSite(user); expect(site.my_user).toBeDefined(); diff --git a/config/defaults.hjson b/config/defaults.hjson index e8a70ebae..4bce48b5f 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -108,10 +108,12 @@ port: 8536 # Whether the site is available over TLS. Needs to be true for federation to work. tls_enabled: true - # The number of activitypub federation workers that can be in-flight concurrently - worker_count: 0 - # The number of activitypub federation retry workers that can be in-flight concurrently - retry_count: 0 + federation: { + # Limit to the number of concurrent outgoing federation requests per target instance. + # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities + # per second) and if a receiving instance is not keeping up. + concurrent_sends_per_instance: 1 + } prometheus: { bind: "127.0.0.1" port: 10002 diff --git a/crates/api_common/src/context.rs b/crates/api_common/src/context.rs index f4ac41db1..334983b20 100644 --- a/crates/api_common/src/context.rs +++ b/crates/api_common/src/context.rs @@ -55,7 +55,7 @@ impl LemmyContext { /// Initialize a context for use in tests which blocks federation network calls. /// /// Do not use this in production code. - pub async fn init_test_context() -> Data { + pub async fn init_test_federation_config() -> FederationConfig { // call this to run migrations let pool = build_db_pool_for_tests().await; @@ -70,14 +70,19 @@ impl LemmyContext { let rate_limit_cell = RateLimitCell::with_test_config(); let context = LemmyContext::create(pool, client, secret, rate_limit_cell.clone()); - let config = FederationConfig::builder() + + FederationConfig::builder() .domain(context.settings().hostname.clone()) .app_data(context) + .debug(true) // Dont allow any network fetches .http_fetch_limit(0) .build() .await - .expect("build federation config"); + .expect("build federation config") + } + pub async fn init_test_context() -> Data { + let config = Self::init_test_federation_config().await; config.to_request_data() } } diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index 10abfaec4..f13ef118d 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -113,7 +113,7 @@ pub struct PrivateMessageReportId(i32); #[cfg_attr(feature = "full", derive(DieselNewType, TS))] #[cfg_attr(feature = "full", ts(export))] /// The site id. -pub struct SiteId(i32); +pub struct SiteId(pub i32); #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "full", derive(DieselNewType, TS))] diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 2405d3af0..b8b438901 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -34,6 +34,13 @@ tokio = { workspace = true, features = ["full"] } tracing.workspace = true moka.workspace = true tokio-util = "0.7.11" +async-trait.workspace = true [dev-dependencies] serial_test = { workspace = true } +url.workspace = true +actix-web.workspace = true +tracing-test = "0.2.5" +uuid.workspace = true +test-context = "0.3.0" +mockall = "0.12.1" diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs new file mode 100644 index 000000000..9869e5270 --- /dev/null +++ b/crates/federate/src/inboxes.rs @@ -0,0 +1,572 @@ +use crate::util::LEMMY_TEST_FAST_FEDERATION; +use anyhow::Result; +use async_trait::async_trait; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_db_schema::{ + newtypes::{CommunityId, DbUrl, InstanceId}, + source::{activity::SentActivity, site::Site}, + utils::{ActualDbPool, DbPool}, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::collections::{HashMap, HashSet}; + +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: +/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits +/// the maximum time until the follow actually results in activities from that community id being +/// sent to that inbox url. This delay currently needs to not be too small because the DB load is +/// currently fairly high because of the current structure of storing inboxes for every person, not +/// having a separate list of shared_inboxes, and the architecture of having every instance queue be +/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") + } else { + chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance +/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it +/// in a timely manner is not too important. +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +#[async_trait] +pub trait DataSource: Send + Sync { + async fn read_site_from_instance_id( + &self, + instance_id: InstanceId, + ) -> Result, diesel::result::Error>; + async fn get_instance_followed_community_inboxes( + &self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result, diesel::result::Error>; +} +pub struct DbDataSource { + pool: ActualDbPool, +} + +impl DbDataSource { + pub fn new(pool: ActualDbPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl DataSource for DbDataSource { + async fn read_site_from_instance_id( + &self, + instance_id: InstanceId, + ) -> Result, diesel::result::Error> { + Site::read_from_instance_id(&mut DbPool::Pool(&self.pool), instance_id).await + } + + async fn get_instance_followed_community_inboxes( + &self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result, diesel::result::Error> { + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut DbPool::Pool(&self.pool), + instance_id, + last_fetch, + ) + .await + } +} + +pub(crate) struct CommunityInboxCollector { + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + instance_id: InstanceId, + domain: String, + pub(crate) data_source: T, +} + +pub type RealCommunityInboxCollector = CommunityInboxCollector; + +impl CommunityInboxCollector { + pub fn new_real( + pool: ActualDbPool, + instance_id: InstanceId, + domain: String, + ) -> RealCommunityInboxCollector { + CommunityInboxCollector::new(DbDataSource::new(pool), instance_id, domain) + } + pub fn new( + data_source: T, + instance_id: InstanceId, + domain: String, + ) -> CommunityInboxCollector { + CommunityInboxCollector { + data_source, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + instance_id, + domain, + } + } + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = self + .data_source + .read_site_from_instance_id(self.instance_id) + .await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these + // activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().cloned()); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + // a similar filter also happens within the activitypub-federation crate. but that filter + // happens much later - by doing it here, we can ensure that in the happy case, this + // function returns 0 urls which means the system doesn't have to create a tokio + // task for sending at all (since that task has a fair amount of overhead) + .filter(|&u| (u.domain() == Some(&self.domain))) + .map(|u| u.inner().clone()), + ); + tracing::trace!( + "get_inbox_urls: {:?}, send_inboxes: {:?}", + inbox_urls, + activity.send_inboxes + ); + Ok(inbox_urls.into_iter().collect()) + } + + pub async fn update_communities(&mut self) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!("{}: fetching full list of communities", self.domain); + // process removals every hour + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(self.instance_id, Utc.timestamp_nanos(0)) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let (news, time) = self + .get_communities(self.instance_id, self.last_incremental_communities_fetch) + .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.domain, + news.len() + ); + } + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about + /// them + async fn get_communities( + &mut self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + // update to time before fetch to ensure overlap. subtract some time to ensure overlap even if + // published date is not exact + let new_last_fetch = Utc::now() - *FOLLOW_ADDITIONS_RECHECK_DELAY / 2; + + let inboxes = self + .data_source + .get_instance_followed_community_inboxes(instance_id, last_fetch) + .await?; + + let map: HashMap> = + inboxes.into_iter().fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }); + + Ok((map, new_last_fetch)) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod tests { + use super::*; + use lemmy_db_schema::{ + newtypes::{ActivityId, CommunityId, InstanceId, SiteId}, + source::activity::{ActorType, SentActivity}, + }; + use mockall::{mock, predicate::*}; + use serde_json::json; + mock! { + DataSource {} + #[async_trait] + impl DataSource for DataSource { + async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> Result, diesel::result::Error>; + async fn get_instance_followed_community_inboxes( + &self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result, diesel::result::Error>; + } + } + + fn setup_collector() -> CommunityInboxCollector { + let mock_data_source = MockDataSource::new(); + let instance_id = InstanceId(1); + let domain = "example.com".to_string(); + CommunityInboxCollector::new(mock_data_source, instance_id, domain) + } + + #[tokio::test] + async fn test_get_inbox_urls_empty() { + let mut collector = setup_collector(); + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![], + send_community_followers_of: None, + send_all_instances: false, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_get_inbox_urls_send_all_instances() { + let mut collector = setup_collector(); + let site_inbox = Url::parse("https://example.com/inbox").unwrap(); + let site = Site { + id: SiteId(1), + name: "Test Site".to_string(), + sidebar: None, + published: Utc::now(), + updated: None, + icon: None, + banner: None, + description: None, + actor_id: Url::parse("https://example.com/site").unwrap().into(), + last_refreshed_at: Utc::now(), + inbox_url: site_inbox.clone().into(), + private_key: None, + public_key: "test_key".to_string(), + instance_id: InstanceId(1), + content_warning: None, + }; + + collector + .data_source + .expect_read_site_from_instance_id() + .return_once(move |_| Ok(Some(site))); + + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![], + send_community_followers_of: None, + send_all_instances: true, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0], site_inbox); + } + + #[tokio::test] + async fn test_get_inbox_urls_community_followers() { + let mut collector = setup_collector(); + let community_id = CommunityId(1); + let url1 = "https://follower1.example.com/inbox"; + let url2 = "https://follower2.example.com/inbox"; + + collector + .data_source + .expect_get_instance_followed_community_inboxes() + .return_once(move |_, _| { + Ok(vec![ + (community_id, Url::parse(url1).unwrap().into()), + (community_id, Url::parse(url2).unwrap().into()), + ]) + }); + + collector.update_communities().await.unwrap(); + + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![], + send_community_followers_of: Some(community_id), + send_all_instances: false, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains(&Url::parse(url1).unwrap())); + assert!(result.contains(&Url::parse(url2).unwrap())); + } + + #[tokio::test] + async fn test_get_inbox_urls_send_inboxes() { + let mut collector = setup_collector(); + collector.domain = "example.com".to_string(); + let inbox_user_1 = Url::parse("https://example.com/user1/inbox").unwrap(); + let inbox_user_2 = Url::parse("https://example.com/user2/inbox").unwrap(); + let other_domain_inbox = Url::parse("https://other-domain.com/user3/inbox").unwrap(); + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![ + Some(inbox_user_1.clone().into()), + Some(inbox_user_2.clone().into()), + Some(other_domain_inbox.clone().into()), + ], + send_community_followers_of: None, + send_all_instances: false, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains(&inbox_user_1)); + assert!(result.contains(&inbox_user_2)); + assert!(!result.contains(&other_domain_inbox)); + } + + #[tokio::test] + async fn test_get_inbox_urls_combined() { + let mut collector = setup_collector(); + collector.domain = "example.com".to_string(); + let community_id = CommunityId(1); + + let site_inbox = Url::parse("https://example.com/site_inbox").unwrap(); + let site = Site { + id: SiteId(1), + name: "Test Site".to_string(), + sidebar: None, + published: Utc::now(), + updated: None, + icon: None, + banner: None, + description: None, + actor_id: Url::parse("https://example.com/site").unwrap().into(), + last_refreshed_at: Utc::now(), + inbox_url: site_inbox.clone().into(), + private_key: None, + public_key: "test_key".to_string(), + instance_id: InstanceId(1), + content_warning: None, + }; + + collector + .data_source + .expect_read_site_from_instance_id() + .return_once(move |_| Ok(Some(site))); + + let subdomain_inbox = "https://follower.example.com/inbox"; + collector + .data_source + .expect_get_instance_followed_community_inboxes() + .return_once(move |_, _| { + Ok(vec![( + community_id, + Url::parse(subdomain_inbox).unwrap().into(), + )]) + }); + + collector.update_communities().await.unwrap(); + let user1_inbox = Url::parse("https://example.com/user1/inbox").unwrap(); + let user2_inbox = Url::parse("https://other-domain.com/user2/inbox").unwrap(); + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![ + Some(user1_inbox.clone().into()), + Some(user2_inbox.clone().into()), + ], + send_community_followers_of: Some(community_id), + send_all_instances: true, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert_eq!(result.len(), 3); + assert!(result.contains(&site_inbox)); + assert!(result.contains(&Url::parse(subdomain_inbox).unwrap())); + assert!(result.contains(&user1_inbox)); + assert!(!result.contains(&user2_inbox)); + } + + #[tokio::test] + async fn test_update_communities() { + let mut collector = setup_collector(); + let community_id1 = CommunityId(1); + let community_id2 = CommunityId(2); + let community_id3 = CommunityId(3); + + let user1_inbox_str = "https://follower1.example.com/inbox"; + let user1_inbox = Url::parse(user1_inbox_str).unwrap(); + let user2_inbox_str = "https://follower2.example.com/inbox"; + let user2_inbox = Url::parse(user2_inbox_str).unwrap(); + let user3_inbox_str = "https://follower3.example.com/inbox"; + let user3_inbox = Url::parse(user3_inbox_str).unwrap(); + + collector + .data_source + .expect_get_instance_followed_community_inboxes() + .times(2) + .returning(move |_, last_fetch| { + if last_fetch == Utc.timestamp_nanos(0) { + Ok(vec![ + (community_id1, Url::parse(user1_inbox_str).unwrap().into()), + (community_id2, Url::parse(user2_inbox_str).unwrap().into()), + ]) + } else { + Ok(vec![( + community_id3, + Url::parse(user3_inbox_str).unwrap().into(), + )]) + } + }); + + // First update + collector.update_communities().await.unwrap(); + assert_eq!(collector.followed_communities.len(), 2); + assert!(collector.followed_communities[&community_id1].contains(&user1_inbox)); + assert!(collector.followed_communities[&community_id2].contains(&user2_inbox)); + + // Simulate time passing + collector.last_full_communities_fetch = Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap(); + collector.last_incremental_communities_fetch = + Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap(); + + // Second update (incremental) + collector.update_communities().await.unwrap(); + assert_eq!(collector.followed_communities.len(), 3); + assert!(collector.followed_communities[&community_id1].contains(&user1_inbox)); + assert!(collector.followed_communities[&community_id3].contains(&user3_inbox)); + assert!(collector.followed_communities[&community_id2].contains(&user2_inbox)); + } + + #[tokio::test] + async fn test_get_inbox_urls_no_duplicates() { + let mut collector = setup_collector(); + collector.domain = "example.com".to_string(); + let community_id = CommunityId(1); + let site_inbox = Url::parse("https://example.com/site_inbox").unwrap(); + let site_inbox_clone = site_inbox.clone(); + let site = Site { + id: SiteId(1), + name: "Test Site".to_string(), + sidebar: None, + published: Utc::now(), + updated: None, + icon: None, + banner: None, + description: None, + actor_id: Url::parse("https://example.com/site").unwrap().into(), + last_refreshed_at: Utc::now(), + inbox_url: site_inbox.clone().into(), + private_key: None, + public_key: "test_key".to_string(), + instance_id: InstanceId(1), + content_warning: None, + }; + + collector + .data_source + .expect_read_site_from_instance_id() + .return_once(move |_| Ok(Some(site))); + + collector + .data_source + .expect_get_instance_followed_community_inboxes() + .return_once(move |_, _| Ok(vec![(community_id, site_inbox_clone.into())])); + + collector.update_communities().await.unwrap(); + + let activity = SentActivity { + id: ActivityId(1), + ap_id: Url::parse("https://example.com/activities/1") + .unwrap() + .into(), + data: json!({}), + sensitive: false, + published: Utc::now(), + send_inboxes: vec![Some(site_inbox.into())], + send_community_followers_of: Some(community_id), + send_all_instances: true, + actor_type: ActorType::Person, + actor_apub_id: None, + }; + + let result = collector.get_inbox_urls(&activity).await.unwrap(); + assert_eq!(result.len(), 1); + assert!(result.contains(&Url::parse("https://example.com/site_inbox").unwrap())); + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 21b9229b5..66c0a2872 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,6 +1,9 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{ + context::LemmyContext, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use stats::receive_print_stats; @@ -14,6 +17,8 @@ use tokio_util::sync::CancellationToken; use tracing::info; use util::FederationQueueStateWithDomain; +mod inboxes; +mod send; mod stats; mod util; mod worker; @@ -38,10 +43,15 @@ pub struct SendManager { context: FederationConfig, stats_sender: UnboundedSender, exit_print: JoinHandle<()>, + federation_worker_config: FederationWorkerConfig, } impl SendManager { - fn new(opts: Opts, context: FederationConfig) -> Self { + fn new( + opts: Opts, + context: FederationConfig, + federation_worker_config: FederationWorkerConfig, + ) -> Self { assert!(opts.process_count > 0); assert!(opts.process_index > 0); assert!(opts.process_index <= opts.process_count); @@ -56,14 +66,20 @@ impl SendManager { stats_receiver, )), context, + federation_worker_config, } } - pub fn run(opts: Opts, context: FederationConfig) -> CancellableTask { + pub fn run( + opts: Opts, + context: FederationConfig, + config: FederationWorkerConfig, + ) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { let opts = opts.clone(); + let config = config.clone(); let context = context.clone(); - let mut manager = Self::new(opts, context); + let mut manager = Self::new(opts, context, config); async move { let result = manager.do_loop(cancel).await; // the loop function will only return if there is (a) an internal error (e.g. db connection @@ -120,22 +136,21 @@ impl SendManager { // create new worker let context = self.context.clone(); let stats_sender = self.stats_sender.clone(); + let federation_worker_config = self.federation_worker_config.clone(); + self.workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask. + // if the instance worker ends unexpectedly due to internal/db errors, this lambda is + // rerun by cancellabletask. let instance = instance.clone(); - let req_data = context.to_request_data(); - let stats_sender = stats_sender.clone(); - async move { - InstanceWorker::init_and_loop( - instance, - req_data, - stop, - stats_sender, - ) - .await - } + InstanceWorker::init_and_loop( + instance, + context.clone(), + federation_worker_config.clone(), + stop, + stats_sender.clone(), + ) }), ); } else if !should_federate { @@ -214,7 +229,14 @@ mod test { .app_data(context.clone()) .build() .await?; + let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + let federation_worker_config = FederationWorkerConfig { + concurrent_sends_per_instance, + }; let pool = &mut context.pool(); let instances = vec![ Instance::read_or_create(pool, "alpha.com".to_string()).await?, @@ -222,7 +244,7 @@ mod test { Instance::read_or_create(pool, "gamma.com".to_string()).await?, ]; - let send_manager = SendManager::new(opts, federation_config); + let send_manager = SendManager::new(opts, federation_config, federation_worker_config); Ok(Self { send_manager, context, diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs new file mode 100644 index 000000000..01d620eb0 --- /dev/null +++ b/crates/federate/src/send.rs @@ -0,0 +1,148 @@ +use crate::util::get_actor_cached; +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + protocol::context::WithContext, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; +use reqwest::Url; +use std::ops::Deref; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Eq)] +pub(crate) struct SendSuccessInfo { + pub activity_id: ActivityId, + pub published: Option>, + // true if the activity was skipped because the target instance is not interested in this + // activity + pub was_skipped: bool, +} +impl PartialEq for SendSuccessInfo { + fn eq(&self, other: &Self) -> bool { + self.activity_id == other.activity_id + } +} +/// order backwards because the binary heap is a max heap, and we need the smallest element to be on +/// top +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } +} + +/// Represents the result of sending an activity. +/// +/// This enum is used to communicate the outcome of a send operation from a send task +/// to the main instance worker. It's designed to maintain a clean separation between +/// the send task and the main thread, allowing the send.rs file to be self-contained +/// and easier to understand. +/// +/// The use of a channel for communication (rather than shared atomic variables) was chosen +/// because: +/// 1. It keeps the send task cleanly separated with no direct interaction with the main thread. +/// 2. The failure event needs to be transferred to the main task for database updates anyway. +/// 3. The main fail_count should only be updated under certain conditions, which are best handled +/// in the main task. +/// 4. It maintains consistency in how data is communicated (all via channels rather than a mix of +/// channels and atomics). +/// 5. It simplifies concurrency management and makes the flow of data more predictable. +pub(crate) enum SendActivityResult { + Success(SendSuccessInfo), + Failure { fail_count: i32 }, +} +/// Represents a task for retrying to send an activity. +/// +/// This struct encapsulates all the necessary information and resources for attempting +/// to send an activity to multiple inbox URLs, with built-in retry logic. +pub(crate) struct SendRetryTask<'a> { + pub activity: &'a SentActivity, + pub object: &'a SharedInboxActivities, + /// Must not be empty at this point + pub inbox_urls: Vec, + /// Channel to report results back to the main instance worker + pub report: &'a mut UnboundedSender, + /// The first request will be sent immediately, but subsequent requests will be delayed + /// according to the number of previous fails + 1 + /// + /// This is a read-only immutable variable that is passed only one way, from the main + /// thread to each send task. It allows the task to determine how long to sleep initially + /// if the request fails. + pub initial_fail_count: i32, + /// For logging purposes + pub domain: String, + pub context: Data, + pub stop: CancellationToken, +} + +impl<'a> SendRetryTask<'a> { + // this function will return successfully when (a) send succeeded or (b) worker cancelled + // and will return an error if an internal error occurred (send errors cause an infinite loop) + pub async fn send_retry_loop(self) -> Result<()> { + let SendRetryTask { + activity, + object, + inbox_urls, + report, + initial_fail_count, + domain, + context, + stop, + } = self; + debug_assert!(!inbox_urls.is_empty()); + + let pool = &mut context.pool(); + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) + .await + .context("failed getting actor instance (was it marked deleted / removed?)")?; + + let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; + for task in requests { + // usually only one due to shared inbox + tracing::debug!("sending out {}", task); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay = federate_retry_sleep_duration(fail_count); + tracing::info!( + "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", + domain, + activity.id, + fail_count + ); + tokio::select! { + () = sleep(retry_delay) => {}, + () = stop.cancelled() => { + // cancel sending without reporting any result. + // the InstanceWorker needs to be careful to not hang on receive of that + // channel when cancelled (see handle_send_results) + return Ok(()); + } + } + } + } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; + Ok(()) + } +} diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 60361c3c9..afbe957a5 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -1,7 +1,6 @@ use anyhow::{anyhow, Context, Result}; use diesel::prelude::*; use diesel_async::RunQueryDsl; -use lemmy_api_common::lemmy_utils::CACHE_DURATION_FEDERATION; use lemmy_apub::{ activity_lists::SharedInboxActivities, fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity}, @@ -28,19 +27,28 @@ use tokio_util::sync::CancellationToken; /// Decrease the delays of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the -/// federation queue. +/// federation queue. This is intentionally a separate flag from other flags like debug_assertions, +/// since this is a invasive change we only need rarely. pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { std::env::var("LEMMY_TEST_FAST_FEDERATION") .map(|s| !s.is_empty()) .unwrap_or(false) }); -/// Recheck for new federation work every n seconds. +/// Recheck for new federation work every n seconds within each InstanceWorker. /// /// When the queue is processed faster than new activities are added and it reaches the current time /// with an empty batch, this is the delay the queue waits before it checks if new activities have /// been added to the sent_activities table. This delay is only applied if no federated activity -/// happens during sending activities of the last batch. +/// happens during sending activities of the last batch, which means on high-activity instances it +/// may never be used. This means that it does not affect the maximum throughput of the queue. +/// +/// +/// This is thus the interval with which tokio wakes up each of the +/// InstanceWorkers to check for new work, if the queue previously was empty. +/// If the delay is too short, the workers (one per federated instance) will wake up too +/// often and consume a lot of CPU. If the delay is long, then activities on low-traffic instances +/// will on average take delay/2 seconds to federate. pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { if *LEMMY_TEST_FAST_FEDERATION { Duration::from_millis(100) @@ -49,6 +57,21 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { } }); +/// Cache the latest activity id for a certain duration. +/// +/// This cache is common to all the instance workers and prevents there from being more than one +/// call per N seconds between each DB query to find max(activity_id). +pub(crate) static CACHE_DURATION_LATEST_ID: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + // in test mode, we use the same cache duration as the recheck delay so when recheck happens + // data is fresh, accelerating the time the tests take. + *WORK_FINISHED_RECHECK_DELAY + } else { + // in normal mode, we limit the query to one per second + Duration::from_secs(1) + } +}); + /// A task that will be run in an infinite loop, unless it is cancelled. /// If the task exits without being cancelled, an error will be logged and the task will be /// restarted. @@ -174,7 +197,7 @@ pub(crate) async fn get_activity_cached( pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { static CACHE: Lazy> = Lazy::new(|| { Cache::builder() - .time_to_live(CACHE_DURATION_FEDERATION) + .time_to_live(*CACHE_DURATION_LATEST_ID) .build() }); CACHE diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 25c9278aa..28f21dcc2 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,132 +1,192 @@ -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - FederationQueueStateWithDomain, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, -}; -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - protocol::context::WithContext, +use crate::{ + inboxes::RealCommunityInboxCollector, + send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, + util::{ + get_activity_cached, + get_latest_activity_id, + FederationQueueStateWithDomain, + WORK_FINISHED_RECHECK_DELAY, + }, }; +use activitypub_federation::config::FederationConfig; use anyhow::{Context, Result}; use chrono::{DateTime, Days, TimeZone, Utc}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; -use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ - newtypes::{ActivityId, CommunityId, InstanceId}, + newtypes::ActivityId, source::{ - activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, - site::Site, }, - utils::naive_now, + utils::{naive_now, ActualDbPool, DbPool}, }; -use lemmy_db_views_actor::structs::CommunityFollowerView; -use once_cell::sync::Lazy; -use reqwest::Url; -use std::{ - collections::{HashMap, HashSet}, - ops::{Add, Deref}, - time::Duration, +use std::{collections::BinaryHeap, ops::Add, time::Duration}; +use tokio::{ + sync::mpsc::{self, UnboundedSender}, + time::sleep, }; -use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, trace, warn}; -/// Check whether to save state to db every n sends if there's no failures (during failures state is -/// saved after every attempt). This determines the batch size for loop_batch. After a batch ends -/// and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. -static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is /// SIGKILLed, less than X seconds of activities are resent) +#[cfg(not(test))] static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// interval with which new additions to community_followers are queried. +#[cfg(test)] +/// in test mode, we want it to save state and send it to print_stats after every send +static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(0); +/// Maximum number of successful sends to allow out of order +const MAX_SUCCESSFULS: usize = 1000; + +/// in prod mode, try to collect multiple send results at the same time to reduce load +#[cfg(not(test))] +static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 4; +#[cfg(test)] +static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0; + /// -/// The first time some user on an instance follows a specific remote community (or, more precisely: -/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits -/// the maximum time until the follow actually results in activities from that community id being -/// sent to that inbox url. This delay currently needs to not be too small because the DB load is -/// currently fairly high because of the current structure of storing inboxes for every person, not -/// having a separate list of shared_inboxes, and the architecture of having every instance queue be -/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") - } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") - } -}); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance -/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it -/// in a timely manner is not too important. -static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); +/// SendManager --(has many)--> InstanceWorker --(has many)--> SendRetryTask +/// | | | +/// -----|------create worker -> loop activities--create task-> send activity +/// | | vvvv +/// | | fail or success +/// | | <-report result-- | +/// | <---order and aggrate results--- | +/// | <---send stats--- | | +/// filter and print stats | | pub(crate) struct InstanceWorker { instance: Instance, - // load site lazily because if an instance is first seen due to being on allowlist, - // the corresponding row in `site` may not exist yet since that is only added once - // `fetch_instance_actor_for_object` is called. - // (this should be unlikely to be relevant outside of the federation tests) - site_loaded: bool, - site: Option, - followed_communities: HashMap>, stop: CancellationToken, - context: Data, - stats_sender: UnboundedSender, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, state: FederationQueueState, last_state_insert: DateTime, + pool: ActualDbPool, + inbox_collector: RealCommunityInboxCollector, + // regularily send stats back to the SendManager + stats_sender: UnboundedSender, + // each HTTP send will report back to this channel concurrently + receive_send_result: mpsc::UnboundedReceiver, + // this part of the channel is cloned and passed to the SendRetryTasks + report_send_result: mpsc::UnboundedSender, + // activities that have been successfully sent but + // that are not the lowest number and thus can't be written to the database yet + successfuls: BinaryHeap, + // number of activities that currently have a task spawned to send it + in_flight: i32, } impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, - context: Data, + config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stop: CancellationToken, stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { - let mut pool = context.pool(); - let state = FederationQueueState::load(&mut pool, instance.id).await?; + let pool = config.to_request_data().inner_pool().clone(); + let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; + let (report_send_result, receive_send_result) = + tokio::sync::mpsc::unbounded_channel::(); let mut worker = InstanceWorker { + inbox_collector: RealCommunityInboxCollector::new_real( + pool.clone(), + instance.id, + instance.domain.clone(), + ), + federation_worker_config, instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), stop, - context, + federation_lib_config: config, stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), + pool, + receive_send_result, + report_send_result, + successfuls: BinaryHeap::::new(), + in_flight: 0, }; + worker.loop_until_stopped().await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// cancelled (graceful exit) - pub(crate) async fn loop_until_stopped(&mut self) -> Result<(), anyhow::Error> { - debug!("Starting federation worker for {}", self.instance.domain); - let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - - self.update_communities().await?; + async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; + let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; + while !self.stop.is_cancelled() { - self.loop_batch().await?; - if self.stop.is_cancelled() { - break; + // check if we need to wait for a send to finish before sending the next one + // we wait if (a) the last request failed, only if a request is already in flight (not at the + // start of the loop) or (b) if we have too many successfuls in memory or (c) if we have + // too many in flight + let need_wait_for_event = (self.in_flight != 0 && self.state.fail_count > 0) + || self.successfuls.len() >= MAX_SUCCESSFULS + || i64::from(self.in_flight) >= self.federation_worker_config.concurrent_sends_per_instance; + if need_wait_for_event || self.receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE + { + // if len() > 0 then this does not block and allows us to write to db more often + // if len is 0 then this means we wait for something to change our above conditions, + // which can only happen by an event sent into the channel + self.handle_send_results().await?; + // handle_send_results does not guarantee that we are now in a condition where we want to + // send a new one, so repeat this check until the if no longer applies + continue; } - if (Utc::now() - self.last_state_insert) > save_state_every { - self.save_and_send_state().await?; + + // send a new activity if there is one + self.inbox_collector.update_communities().await?; + let next_id_to_send = ActivityId(last_sent_id.0 + 1); + { + // sanity check: calculate next id to send based on the last id and the in flight requests + let expected_next_id = self.state.last_successful_id.map(|last_successful_id| { + last_successful_id.0 + (self.successfuls.len() as i64) + i64::from(self.in_flight) + 1 + }); + // compare to next id based on incrementing + if expected_next_id != Some(next_id_to_send.0) { + anyhow::bail!( + "{}: next id to send is not as expected: {:?} != {:?}", + self.instance.domain, + expected_next_id, + next_id_to_send + ) + } } - self.update_communities().await?; + + if next_id_to_send > newest_id { + // lazily fetch latest id only if we have cought up + newest_id = self.get_latest_ids().await?.1; + if next_id_to_send > newest_id { + if next_id_to_send > ActivityId(newest_id.0 + 1) { + tracing::error!( + "{}: next send id {} is higher than latest id {}+1 in database (did the db get cleared?)", + self.instance.domain, + next_id_to_send.0, + newest_id.0 + ); + } + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop while waiting for new work") + } + } + continue; + } + } + self.in_flight += 1; + last_sent_id = next_id_to_send; + self.spawn_send_if_needed(next_id_to_send).await?; } - // final update of state in db + tracing::debug!("cancelled worker loop after send"); + + // final update of state in db on shutdown self.save_and_send_state().await?; Ok(()) } @@ -144,18 +204,27 @@ impl InstanceWorker { return Ok(()); } let remaining = required - elapsed; + tracing::debug!( + "{}: fail-sleeping for {:?} before starting queue", + self.instance.domain, + remaining + ); tokio::select! { () = sleep(remaining) => {}, - () = self.stop.cancelled() => {} + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop during initial fail sleep") + } } } Ok(()) } - /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities - async fn loop_batch(&mut self) -> Result<()> { - let latest_id = get_latest_activity_id(&mut self.context.pool()).await?; - let mut id = if let Some(id) = self.state.last_successful_id { - id + + /// return the last successfully sent id and the newest activity id in the database + /// sets last_successful_id in database if it's the first time this instance is seen + async fn get_latest_ids(&mut self) -> Result<(ActivityId, ActivityId)> { + let latest_id = get_latest_activity_id(&mut self.pool()).await?; + let last = if let Some(last) = self.state.last_successful_id { + last } else { // this is the initial creation (instance first seen) of the federation queue for this // instance @@ -166,203 +235,542 @@ impl InstanceWorker { self.save_and_send_state().await?; latest_id }; - if id >= latest_id { - if id > latest_id { - tracing::error!( - "{}: last successful id {} is higher than latest id {} in database (did the db get cleared?)", - self.instance.domain, - id.0, - latest_id.0 - ); - } - // no more work to be done, wait before rechecking - tokio::select! { - () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} - } - return Ok(()); - } - let mut processed_activities = 0; - while id < latest_id - && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !self.stop.is_cancelled() - { - id = ActivityId(id.0 + 1); - processed_activities += 1; - let Some(ele) = get_activity_cached(&mut self.context.pool(), id) - .await - .context("failed reading activity from db")? - else { - debug!("{}: {:?} does not exist", self.instance.domain, id); - self.state.last_successful_id = Some(id); - continue; - }; - if let Err(e) = self.send_retry_loop(&ele.0, &ele.1).await { - warn!( - "sending {} errored internally, skipping activity: {:?}", - ele.0.ap_id, e - ); - } - if self.stop.is_cancelled() { + Ok((last, latest_id)) + } + + async fn handle_send_results(&mut self) -> Result<(), anyhow::Error> { + let mut force_write = false; + let mut events = Vec::new(); + // Wait for at least one event but if there's multiple handle them all. + // We need to listen to the cancel event here as well in order to prevent a hang on shutdown: + // If the SendRetryTask gets cancelled, it immediately exits without reporting any state. + // So if the worker is waiting for a send result and all SendRetryTask gets cancelled, this recv + // could hang indefinitely otherwise. The tasks will also drop their handle of + // report_send_result which would cause the recv_many method to return 0 elements, but since + // InstanceWorker holds a copy of the send result channel as well, that won't happen. + tokio::select! { + _ = self.receive_send_result.recv_many(&mut events, 1000) => {}, + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop while waiting for send results"); return Ok(()); } - // send success! - self.state.last_successful_id = Some(id); - self.state.last_successful_published_time = Some(ele.0.published); - self.state.fail_count = 0; + } + for event in events { + match event { + SendActivityResult::Success(s) => { + self.in_flight -= 1; + if !s.was_skipped { + self.state.fail_count = 0; + self.mark_instance_alive().await?; + } + self.successfuls.push(s); + } + SendActivityResult::Failure { fail_count, .. } => { + if fail_count > self.state.fail_count { + // override fail count - if multiple activities are currently sending this value may get + // conflicting info but that's fine. + // This needs to be this way, all alternatives would be worse. The reason is that if 10 + // simultaneous requests fail within a 1s period, we don't want the next retry to be + // exponentially 2**10 s later. Any amount of failures within a fail-sleep period should + // only count as one failure. + + self.state.fail_count = fail_count; + self.state.last_retry = Some(Utc::now()); + force_write = true; + } + } + } + } + self.pop_successfuls_and_write(force_write).await?; + Ok(()) + } + async fn mark_instance_alive(&mut self) -> Result<()> { + // Activity send successful, mark instance as alive if it hasn't been updated in a while. + let updated = self.instance.updated.unwrap_or(self.instance.published); + if updated.add(Days::new(1)) < Utc::now() { + self.instance.updated = Some(Utc::now()); + + let form = InstanceForm::builder() + .domain(self.instance.domain.clone()) + .updated(Some(naive_now())) + .build(); + Instance::update(&mut self.pool(), self.instance.id, form).await?; + } + Ok(()) + } + /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have + /// been sent successfully. In that case updates `last_successful_id` and saves the state to the + /// database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. + async fn pop_successfuls_and_write(&mut self, force_write: bool) -> Result<()> { + let Some(mut last_id) = self.state.last_successful_id else { + tracing::warn!( + "{} should be impossible: last successful id is None", + self.instance.domain + ); + return Ok(()); + }; + tracing::debug!( + "{} last: {:?}, next: {:?}, currently in successfuls: {:?}", + self.instance.domain, + last_id, + self.successfuls.peek(), + self.successfuls.iter() + ); + while self + .successfuls + .peek() + .map(|a| a.activity_id == ActivityId(last_id.0 + 1)) + .unwrap_or(false) + { + let next = self + .successfuls + .pop() + .context("peek above ensures pop has value")?; + last_id = next.activity_id; + self.state.last_successful_id = Some(next.activity_id); + self.state.last_successful_published_time = next.published; + } + + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + if force_write || (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state().await?; } Ok(()) } - // this function will return successfully when (a) send succeeded or (b) worker cancelled - // and will return an error if an internal error occurred (send errors cause an infinite loop) - async fn send_retry_loop( - &mut self, - activity: &SentActivity, - object: &SharedInboxActivities, - ) -> Result<()> { + /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task + /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many) + /// cases where we don't have any inboxes + async fn spawn_send_if_needed(&mut self, activity_id: ActivityId) -> Result<()> { + let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) + .await + .context("failed reading activity from db")? + else { + tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id); + self + .report_send_result + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + }))?; + return Ok(()); + }; + let activity = &ele.0; let inbox_urls = self + .inbox_collector .get_inbox_urls(activity) .await .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { - trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); - self.state.last_successful_id = Some(activity.id); - self.state.last_successful_published_time = Some(activity.published); + // this is the case when the activity is not relevant to this receiving instance (e.g. no user + // subscribed to the relevant community) + tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + self + .report_send_result + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + // it would be valid here to either return None or Some(activity.published). The published + // time is only used for stats pages that track federation delay. None can be a bit + // misleading because if you look at / chart the published time for federation from a + // large to a small instance that's only subscribed to a few small communities, + // then it will show the last published time as a days ago even though + // federation is up to date. + published: Some(activity.published), + was_skipped: true, + }))?; return Ok(()); } - let Some(actor_apub_id) = &activity.actor_apub_id else { - return Ok(()); // activity was inserted before persistent queue was activated - }; - let actor = get_actor_cached(&mut self.context.pool(), activity.actor_type, actor_apub_id) - .await - .context("failed getting actor instance (was it marked deleted / removed?)")?; - - let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let inbox_urls = inbox_urls.into_iter().collect(); - let requests = - SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; - for task in requests { - // usually only one due to shared inbox - trace!("sending out {}", task); - while let Err(e) = task.sign_and_send(&self.context).await { - self.state.fail_count += 1; - self.state.last_retry = Some(Utc::now()); - let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); - info!( - "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, activity.id, self.state.fail_count + let initial_fail_count = self.state.fail_count; + let data = self.federation_lib_config.to_request_data(); + let stop = self.stop.clone(); + let domain = self.instance.domain.clone(); + let mut report = self.report_send_result.clone(); + tokio::spawn(async move { + let res = SendRetryTask { + activity: &ele.0, + object: &ele.1, + inbox_urls, + report: &mut report, + initial_fail_count, + domain, + context: data, + stop, + } + .send_retry_loop() + .await; + if let Err(e) = res { + tracing::warn!( + "sending {} errored internally, skipping activity: {:?}", + ele.0.ap_id, + e ); - self.save_and_send_state().await?; - tokio::select! { - () = sleep(retry_delay) => {}, - () = self.stop.cancelled() => { - // save state to db and exit - return Ok(()); - } - } + // An error in this location means there is some deeper internal issue with the activity, + // for example the actor can't be loaded or similar. These issues are probably not + // solveable by retrying and would cause the federation for this instance to permanently be + // stuck in a retry loop. So we log the error and skip the activity (by reporting success to + // the worker) + report + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })) + .ok(); } - - // Activity send successful, mark instance as alive if it hasn't been updated in a while. - let updated = self.instance.updated.unwrap_or(self.instance.published); - if updated.add(Days::new(1)) < Utc::now() { - self.instance.updated = Some(Utc::now()); - - let form = InstanceForm::builder() - .domain(self.instance.domain.clone()) - .updated(Some(naive_now())) - .build(); - Instance::update(&mut self.context.pool(), self.instance.id, form).await?; - } - } + }); Ok(()) } - /// get inbox urls of sending the given activity to the given instance - /// most often this will return 0 values (if instance doesn't care about the activity) - /// or 1 value (the shared inbox) - /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { - let mut inbox_urls: HashSet = HashSet::new(); - - if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(&mut self.context.pool(), self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these - // activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); - } - } - if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { - inbox_urls.extend(urls.iter().cloned()); - } - } - inbox_urls.extend( - activity - .send_inboxes - .iter() - .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) - .map(|u| u.inner().clone()), - ); - Ok(inbox_urls) - } - - async fn update_communities(&mut self) -> Result<()> { - if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { - // process removals every hour - (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(self.instance.id, Utc.timestamp_nanos(0)) - .await?; - self.last_incremental_communities_fetch = self.last_full_communities_fetch; - } - if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { - // process additions every minute - let (news, time) = self - .get_communities(self.instance.id, self.last_incremental_communities_fetch) - .await?; - self.followed_communities.extend(news); - self.last_incremental_communities_fetch = time; - } - Ok(()) - } - - /// get a list of local communities with the remote inboxes on the given instance that cares about - /// them - async fn get_communities( - &mut self, - instance_id: InstanceId, - last_fetch: DateTime, - ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if - // published date is not exact - Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes( - &mut self.context.pool(), - instance_id, - last_fetch, - ) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), - new_last_fetch, - )) - } async fn save_and_send_state(&mut self) -> Result<()> { + tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); - FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; + FederationQueueState::upsert(&mut self.pool(), &self.state).await?; self.stats_sender.send(FederationQueueStateWithDomain { state: self.state.clone(), domain: self.instance.domain.clone(), })?; Ok(()) } + + fn pool(&self) -> DbPool<'_> { + DbPool::Pool(&self.pool) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod test { + + use super::*; + use activitypub_federation::{ + http_signatures::generate_actor_keypair, + protocol::context::WithContext, + }; + use actix_web::{dev::ServerHandle, web, App, HttpResponse, HttpServer}; + use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; + use lemmy_db_schema::{ + newtypes::DbUrl, + source::{ + activity::{ActorType, SentActivity, SentActivityForm}, + person::{Person, PersonInsertForm}, + }, + traits::Crud, + }; + use lemmy_utils::error::LemmyResult; + use reqwest::StatusCode; + use serde_json::{json, Value}; + use serial_test::serial; + use test_context::{test_context, AsyncTestContext}; + use tokio::{ + spawn, + sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, + }; + use tracing_test::traced_test; + use url::Url; + + struct Data { + context: activitypub_federation::config::Data, + instance: Instance, + person: Person, + stats_receiver: UnboundedReceiver, + inbox_receiver: UnboundedReceiver, + cancel: CancellationToken, + cleaned_up: bool, + wait_stop_server: ServerHandle, + is_concurrent: bool, + } + + impl Data { + async fn init() -> LemmyResult { + let context = LemmyContext::init_test_federation_config().await; + let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?; + + let actor_keypair = generate_actor_keypair()?; + let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); + let person_form = PersonInsertForm { + actor_id: Some(actor_id.clone()), + private_key: (Some(actor_keypair.private_key)), + inbox_url: Some(generate_inbox_url(&actor_id)?), + shared_inbox_url: Some(generate_shared_inbox_url(context.settings())?), + ..PersonInsertForm::new("alice".to_string(), actor_keypair.public_key, instance.id) + }; + let person = Person::create(&mut context.pool(), &person_form).await?; + + let cancel = CancellationToken::new(); + let (stats_sender, stats_receiver) = unbounded_channel(); + let (inbox_sender, inbox_receiver) = unbounded_channel(); + + // listen for received activities in background + let wait_stop_server = listen_activities(inbox_sender)?; + + let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(10); + + let fed_config = FederationWorkerConfig { + concurrent_sends_per_instance, + }; + spawn(InstanceWorker::init_and_loop( + instance.clone(), + context.clone(), + fed_config, + cancel.clone(), + stats_sender, + )); + // wait for startup + sleep(*WORK_FINISHED_RECHECK_DELAY).await; + + Ok(Self { + context: context.to_request_data(), + instance, + person, + stats_receiver, + inbox_receiver, + cancel, + wait_stop_server, + cleaned_up: false, + is_concurrent: concurrent_sends_per_instance > 1, + }) + } + + async fn cleanup(&mut self) -> LemmyResult<()> { + if self.cleaned_up { + return Ok(()); + } + self.cleaned_up = true; + self.cancel.cancel(); + sleep(*WORK_FINISHED_RECHECK_DELAY).await; + Instance::delete_all(&mut self.context.pool()).await?; + Person::delete(&mut self.context.pool(), self.person.id).await?; + self.wait_stop_server.stop(true).await; + Ok(()) + } + } + + /// In order to guarantee that the webserver is stopped via the cleanup function, + /// we implement a test context. + impl AsyncTestContext for Data { + async fn setup() -> Data { + Data::init().await.unwrap() + } + async fn teardown(mut self) { + self.cleanup().await.unwrap() + } + } + + #[test_context(Data)] + #[tokio::test] + #[traced_test] + #[serial] + async fn test_stats(data: &mut Data) -> LemmyResult<()> { + tracing::debug!("hello world"); + + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + tracing::debug!("received first stats"); + assert_eq!(data.instance.id, rcv.state.instance_id); + + let sent = send_activity(data.person.actor_id.clone(), &data.context, true).await?; + tracing::debug!("sent activity"); + // receive for successfully sent activity + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + assert_eq!(&sent.data, parsed_activity.inner()); + tracing::debug!("received activity"); + + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(data.instance.id, rcv.state.instance_id); + assert_eq!(Some(sent.id), rcv.state.last_successful_id); + tracing::debug!("received second stats"); + + data.cleanup().await?; + + // it also sends state on shutdown + let rcv = data.stats_receiver.try_recv(); + assert!(rcv.is_ok()); + + // nothing further received + let rcv = data.stats_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); + let inbox_rcv = data.inbox_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err()); + + Ok(()) + } + + #[test_context(Data)] + #[tokio::test] + #[traced_test] + #[serial] + async fn test_send_40(data: &mut Data) -> LemmyResult<()> { + tracing::debug!("hello world"); + + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + tracing::debug!("received first stats"); + assert_eq!(data.instance.id, rcv.state.instance_id); + // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + // let last_id_before = rcv.state.last_successful_id.unwrap(); + let mut sent = Vec::new(); + for _ in 0..40 { + sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?); + } + sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; + tracing::debug!("sent activity"); + compare_sent_with_receive(data, sent).await?; + + Ok(()) + } + + #[test_context(Data)] + #[tokio::test] + #[traced_test] + #[serial] + /// this test sends 15 activities, waits and checks they have all been received, then sends 50, + /// etc + async fn test_send_15_20_30(data: &mut Data) -> LemmyResult<()> { + tracing::debug!("hello world"); + + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + tracing::debug!("received first stats"); + assert_eq!(data.instance.id, rcv.state.instance_id); + // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + // let last_id_before = rcv.state.last_successful_id.unwrap(); + let counts = vec![15, 20, 35]; + for count in counts { + tracing::debug!("sending {} activities", count); + let mut sent = Vec::new(); + for _ in 0..count { + sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?); + } + sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; + tracing::debug!("sent activity"); + compare_sent_with_receive(data, sent).await?; + } + + Ok(()) + } + + #[test_context(Data)] + #[tokio::test] + #[serial] + async fn test_update_instance(data: &mut Data) -> LemmyResult<()> { + let form = InstanceForm::builder() + .domain(data.instance.domain.clone()) + .updated(None) + .build(); + Instance::update(&mut data.context.pool(), data.instance.id, form).await?; + + send_activity(data.person.actor_id.clone(), &data.context, true).await?; + data.inbox_receiver.recv().await.unwrap(); + + let instance = + Instance::read_or_create(&mut data.context.pool(), data.instance.domain.clone()).await?; + + assert!(instance.updated.is_some()); + + data.cleanup().await?; + + Ok(()) + } + + fn listen_activities(inbox_sender: UnboundedSender) -> LemmyResult { + let run = HttpServer::new(move || { + App::new() + .app_data(actix_web::web::Data::new(inbox_sender.clone())) + .route( + "/inbox", + web::post().to( + |inbox_sender: actix_web::web::Data>, body: String| async move { + tracing::debug!("received activity: {:?}", body); + inbox_sender.send(body.clone()).unwrap(); + HttpResponse::new(StatusCode::OK) + }, + ), + ) + }) + .bind(("127.0.0.1", 8085))? + .run(); + let handle = run.handle(); + tokio::spawn(async move { + run.await.unwrap(); + /*select! { + _ = run => {}, + _ = cancel.cancelled() => { } + }*/ + }); + Ok(handle) + } + + async fn send_activity( + actor_id: DbUrl, + context: &LemmyContext, + wait: bool, + ) -> LemmyResult { + // create outgoing activity + let data = json!({ + "actor": "http://ds9.lemmy.ml/u/lemmy_alpha", + "object": "http://ds9.lemmy.ml/comment/1", + "audience": "https://enterprise.lemmy.ml/c/tenforward", + "type": "Like", + "id": format!("http://ds9.lemmy.ml/activities/like/{}", uuid::Uuid::new_v4()), + }); + let form = SentActivityForm { + ap_id: Url::parse(&format!( + "http://local.com/activity/{}", + uuid::Uuid::new_v4() + ))? + .into(), + data, + sensitive: false, + send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())], + send_all_instances: false, + send_community_followers_of: None, + actor_type: ActorType::Person, + actor_apub_id: actor_id, + }; + let sent = SentActivity::create(&mut context.pool(), form).await?; + + if wait { + sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await; + } + + Ok(sent) + } + async fn compare_sent_with_receive(data: &mut Data, mut sent: Vec) -> Result<()> { + let check_order = !data.is_concurrent; // allow out-of order receiving when running parallel + let mut received = Vec::new(); + for _ in 0..sent.len() { + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + received.push(parsed_activity); + } + if !check_order { + // sort by id + received.sort_by(|a, b| { + a.inner()["id"] + .as_str() + .unwrap() + .cmp(b.inner()["id"].as_str().unwrap()) + }); + sent.sort_by(|a, b| { + a.data["id"] + .as_str() + .unwrap() + .cmp(b.data["id"].as_str().unwrap()) + }); + } + // receive for successfully sent activity + for i in 0..sent.len() { + let sent_activity = &sent[i]; + let received_activity = received[i].inner(); + assert_eq!(&sent_activity.data, received_activity); + tracing::debug!("received activity"); + } + Ok(()) + } } diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 75aa56a88..dcacc37b6 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -43,12 +43,8 @@ pub struct Settings { #[default(None)] #[doku(skip)] pub opentelemetry_url: Option, - /// The number of activitypub federation workers that can be in-flight concurrently - #[default(0)] - pub worker_count: usize, - /// The number of activitypub federation retry workers that can be in-flight concurrently - #[default(0)] - pub retry_count: usize, + #[default(Default::default())] + pub federation: FederationWorkerConfig, // Prometheus configuration. #[default(None)] #[doku(example = "Some(Default::default())")] @@ -237,3 +233,14 @@ pub struct PrometheusConfig { #[doku(example = "10002")] pub port: i32, } + +#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] +#[serde(default)] +// named federation"worker"config to disambiguate from the activitypub library configuration +pub struct FederationWorkerConfig { + /// Limit to the number of concurrent outgoing federation requests per target instance. + /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities + /// per second) and if a receiving instance is not keeping up. + #[default(1)] + pub concurrent_sends_per_instance: i64, +} diff --git a/scripts/test.sh b/scripts/test.sh index 9bb6acaa8..04cc94f9d 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -14,6 +14,7 @@ source scripts/start_dev_db.sh # so to load the config we need to traverse to the repo root export LEMMY_CONFIG_LOCATION=../../config/config.hjson export RUST_BACKTRACE=1 +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min if [ -n "$PACKAGE" ]; then diff --git a/src/lib.rs b/src/lib.rs index 74c05deaa..3c3633b75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -233,6 +233,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { process_count: args.federate_process_count, }, cfg, + SETTINGS.federation.clone(), ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;