From a08642f81377e8ef01f91f5d49f531e71fd62d70 Mon Sep 17 00:00:00 2001
From: phiresky <phireskyde+git@gmail.com>
Date: Sun, 21 Jul 2024 17:50:50 +0200
Subject: [PATCH] federation: parallel sending per instance (#4623)

* federation: parallel sending

* federation: some comments

* lint and set force_write true when a request fails

* inbox_urls return vec

* split inbox functions into separate file

* cleanup

* extract sending task code to separate file

* move federation concurrent config to config file

* off by one issue

* improve msg

* fix both permanent stopping of federation queues and multiple creation of the same federation queues

* fix after merge

* lint fix

* Update crates/federate/src/send.rs

Co-authored-by: dullbananas <dull.bananas0@gmail.com>

* comment about reverse ordering

* remove crashable, comment

* comment

* move comment

* run federation tests twice

* fix test run

* prettier

* fix config default

* upgrade rust to 1.78 to fix diesel cli

* fix clippy

* delay

* add debug to make localhost urls not valid in ap crate, add some debug logs

* federation tests: ensure server stop after test and random activity id

* ci fix

* add test to federate 100 events

* fix send 100 test

* different data every time so activities are distinguishable

* allow out of order receives in test

* lint

* comment about https://github.com/LemmyNet/lemmy/pull/4623#discussion_r1565437391

* move sender for clarity, add comment

* move more things to members

* update test todo comment, use same env var as worker test but default to 1

* remove else below continue

* some more cleanup

* handle todo about smooth exit

* add federate inboxes collector tests

* lint

* actor max length

* don't reset fail count if activity skipped

* fix some comments

* reuse vars

* format

* Update .woodpecker.yml

* fix recheck time

* fix inboxes tests under fast mode

* format

* make i32 and ugly casts

* clippy

---------

Co-authored-by: dullbananas <dull.bananas0@gmail.com>
---
 .woodpecker.yml                      |   1 +
 Cargo.lock                           | 120 ++++
 api_tests/src/follow.spec.ts         |  67 +-
 api_tests/src/post.spec.ts           |  30 +-
 api_tests/src/user.spec.ts           |   6 +-
 config/defaults.hjson                |  10 +-
 crates/api_common/src/context.rs     |  11 +-
 crates/db_schema/src/newtypes.rs     |   2 +-
 crates/federate/Cargo.toml           |   7 +
 crates/federate/src/inboxes.rs       | 572 ++++++++++++++++
 crates/federate/src/lib.rs           |  56 +-
 crates/federate/src/send.rs          | 148 +++++
 crates/federate/src/util.rs          |  33 +-
 crates/federate/src/worker.rs        | 932 +++++++++++++++++++--------
 crates/utils/src/settings/structs.rs |  19 +-
 scripts/test.sh                      |   1 +
 src/lib.rs                           |   1 +
 17 files changed, 1686 insertions(+), 330 deletions(-)
 create mode 100644 crates/federate/src/inboxes.rs
 create mode 100644 crates/federate/src/send.rs

diff --git a/.woodpecker.yml b/.woodpecker.yml
index fe11411d86..d12f123261 100644
--- a/.woodpecker.yml
+++ b/.woodpecker.yml
@@ -181,6 +181,7 @@ steps:
       LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
       RUST_BACKTRACE: "1"
       CARGO_HOME: .cargo_home
+      LEMMY_TEST_FAST_FEDERATION: "1"
     commands:
       - export LEMMY_CONFIG_LOCATION=../../config/config.hjson
       - cargo test --workspace --no-fail-fast
diff --git a/Cargo.lock b/Cargo.lock
index b15baa9039..e1b90a9f0b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1674,6 +1674,12 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "downcast"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
+
 [[package]]
 name = "downcast-rs"
 version = "1.2.1"
@@ -1915,6 +1921,12 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "fragile"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
+
 [[package]]
 name = "fs2"
 version = "0.4.3"
@@ -2980,7 +2992,9 @@ name = "lemmy_federate"
 version = "0.19.5"
 dependencies = [
  "activitypub_federation",
+ "actix-web",
  "anyhow",
+ "async-trait",
  "chrono",
  "diesel",
  "diesel-async",
@@ -2990,14 +3004,19 @@ dependencies = [
  "lemmy_db_schema",
  "lemmy_db_views_actor",
  "lemmy_utils",
+ "mockall",
  "moka",
  "once_cell",
  "reqwest 0.11.27",
  "serde_json",
  "serial_test",
+ "test-context",
  "tokio",
  "tokio-util",
  "tracing",
+ "tracing-test",
+ "url",
+ "uuid",
 ]
 
 [[package]]
@@ -3475,6 +3494,33 @@ version = "1.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1"
 
+[[package]]
+name = "mockall"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48"
+dependencies = [
+ "cfg-if",
+ "downcast",
+ "fragile",
+ "lazy_static",
+ "mockall_derive",
+ "predicates",
+ "predicates-tree",
+]
+
+[[package]]
+name = "mockall_derive"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2"
+dependencies = [
+ "cfg-if",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "moka"
 version = "0.12.8"
@@ -4273,6 +4319,32 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
 
+[[package]]
+name = "predicates"
+version = "3.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8"
+dependencies = [
+ "anstyle",
+ "predicates-core",
+]
+
+[[package]]
+name = "predicates-core"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
+
+[[package]]
+name = "predicates-tree"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf"
+dependencies = [
+ "predicates-core",
+ "termtree",
+]
+
 [[package]]
 name = "pretty_assertions"
 version = "1.4.0"
@@ -5676,6 +5748,33 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "termtree"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
+
+[[package]]
+name = "test-context"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9"
+dependencies = [
+ "futures",
+ "test-context-macros",
+]
+
+[[package]]
+name = "test-context-macros"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "thiserror"
 version = "1.0.62"
@@ -6299,6 +6398,27 @@ dependencies = [
  "tracing-serde",
 ]
 
+[[package]]
+name = "tracing-test"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
+dependencies = [
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test-macro",
+]
+
+[[package]]
+name = "tracing-test-macro"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
+dependencies = [
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "triomphe"
 version = "0.1.11"
diff --git a/api_tests/src/follow.spec.ts b/api_tests/src/follow.spec.ts
index 161c7f0453..22fdfa305d 100644
--- a/api_tests/src/follow.spec.ts
+++ b/api_tests/src/follow.spec.ts
@@ -11,6 +11,7 @@ import {
   betaUrl,
   registerUser,
   unfollows,
+  delay,
 } from "./shared";
 
 beforeAll(setupLogins);
@@ -21,39 +22,48 @@ test("Follow local community", async () => {
   let user = await registerUser(beta, betaUrl);
 
   let community = (await resolveBetaCommunity(user)).community!;
-  expect(community.counts.subscribers).toBe(1);
-  expect(community.counts.subscribers_local).toBe(1);
   let follow = await followCommunity(user, true, community.community.id);
 
   // Make sure the follow response went through
   expect(follow.community_view.community.local).toBe(true);
   expect(follow.community_view.subscribed).toBe("Subscribed");
-  expect(follow.community_view.counts.subscribers).toBe(2);
-  expect(follow.community_view.counts.subscribers_local).toBe(2);
+  expect(follow.community_view.counts.subscribers).toBe(
+    community.counts.subscribers + 1,
+  );
+  expect(follow.community_view.counts.subscribers_local).toBe(
+    community.counts.subscribers_local + 1,
+  );
 
   // Test an unfollow
   let unfollow = await followCommunity(user, false, community.community.id);
   expect(unfollow.community_view.subscribed).toBe("NotSubscribed");
-  expect(unfollow.community_view.counts.subscribers).toBe(1);
-  expect(unfollow.community_view.counts.subscribers_local).toBe(1);
+  expect(unfollow.community_view.counts.subscribers).toBe(
+    community.counts.subscribers,
+  );
+  expect(unfollow.community_view.counts.subscribers_local).toBe(
+    community.counts.subscribers_local,
+  );
 });
 
 test("Follow federated community", async () => {
   // It takes about 1 second for the community aggregates to federate
-  let betaCommunity = (
+  await delay(2000); // if this is the second test run, we don't have a way to wait for the correct number of subscribers
+  const betaCommunityInitial = (
     await waitUntil(
       () => resolveBetaCommunity(alpha),
-      c =>
-        c.community?.counts.subscribers === 1 &&
-        c.community.counts.subscribers_local === 0,
+      c => !!c.community && c.community?.counts.subscribers >= 1,
     )
   ).community;
-  if (!betaCommunity) {
+  if (!betaCommunityInitial) {
     throw "Missing beta community";
   }
-  let follow = await followCommunity(alpha, true, betaCommunity.community.id);
+  let follow = await followCommunity(
+    alpha,
+    true,
+    betaCommunityInitial.community.id,
+  );
   expect(follow.community_view.subscribed).toBe("Pending");
-  betaCommunity = (
+  const betaCommunity = (
     await waitUntil(
       () => resolveBetaCommunity(alpha),
       c => c.community?.subscribed === "Subscribed",
@@ -64,20 +74,24 @@ test("Follow federated community", async () => {
   expect(betaCommunity?.community.local).toBe(false);
   expect(betaCommunity?.community.name).toBe("main");
   expect(betaCommunity?.subscribed).toBe("Subscribed");
-  expect(betaCommunity?.counts.subscribers_local).toBe(1);
+  expect(betaCommunity?.counts.subscribers_local).toBe(
+    betaCommunityInitial.counts.subscribers_local + 1,
+  );
 
   // check that unfollow was federated
   let communityOnBeta1 = await resolveBetaCommunity(beta);
-  expect(communityOnBeta1.community?.counts.subscribers).toBe(2);
-  expect(communityOnBeta1.community?.counts.subscribers_local).toBe(1);
+  expect(communityOnBeta1.community?.counts.subscribers).toBe(
+    betaCommunityInitial.counts.subscribers + 1,
+  );
 
   // Check it from local
   let site = await getSite(alpha);
   let remoteCommunityId = site.my_user?.follows.find(
-    c => c.community.local == false,
+    c =>
+      c.community.local == false &&
+      c.community.id === betaCommunityInitial.community.id,
   )?.community.id;
   expect(remoteCommunityId).toBeDefined();
-  expect(site.my_user?.follows.length).toBe(2);
 
   if (!remoteCommunityId) {
     throw "Missing remote community id";
@@ -89,10 +103,21 @@ test("Follow federated community", async () => {
 
   // Make sure you are unsubbed locally
   let siteUnfollowCheck = await getSite(alpha);
-  expect(siteUnfollowCheck.my_user?.follows.length).toBe(1);
+  expect(
+    siteUnfollowCheck.my_user?.follows.find(
+      c => c.community.id === betaCommunityInitial.community.id,
+    ),
+  ).toBe(undefined);
 
   // check that unfollow was federated
-  let communityOnBeta2 = await resolveBetaCommunity(beta);
-  expect(communityOnBeta2.community?.counts.subscribers).toBe(1);
+  let communityOnBeta2 = await waitUntil(
+    () => resolveBetaCommunity(beta),
+    c =>
+      c.community?.counts.subscribers ===
+      betaCommunityInitial.counts.subscribers,
+  );
+  expect(communityOnBeta2.community?.counts.subscribers).toBe(
+    betaCommunityInitial.counts.subscribers,
+  );
   expect(communityOnBeta2.community?.counts.subscribers_local).toBe(1);
 });
diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts
index fe17bd979f..6b5c8d812e 100644
--- a/api_tests/src/post.spec.ts
+++ b/api_tests/src/post.spec.ts
@@ -52,17 +52,23 @@ beforeAll(async () => {
 
 afterAll(unfollows);
 
-async function assertPostFederation(postOne: PostView, postTwo: PostView) {
+async function assertPostFederation(
+  postOne: PostView,
+  postTwo: PostView,
+  waitForMeta = true,
+) {
   // Link metadata is generated in background task and may not be ready yet at this time,
   // so wait for it explicitly. For removed posts we cant refetch anything.
-  postOne = await waitForPost(beta, postOne.post, res => {
-    return res === null || res?.post.embed_title !== null;
-  });
-  postTwo = await waitForPost(
-    beta,
-    postTwo.post,
-    res => res === null || res?.post.embed_title !== null,
-  );
+  if (waitForMeta) {
+    postOne = await waitForPost(beta, postOne.post, res => {
+      return res === null || !!res?.post.embed_title;
+    });
+    postTwo = await waitForPost(
+      beta,
+      postTwo.post,
+      res => res === null || !!res?.post.embed_title,
+    );
+  }
 
   expect(postOne?.post.ap_id).toBe(postTwo?.post.ap_id);
   expect(postOne?.post.name).toBe(postTwo?.post.name);
@@ -408,7 +414,11 @@ test("Remove a post from admin and community on same instance", async () => {
     p => p?.post_view.post.removed ?? false,
   );
   expect(alphaPost?.post_view.post.removed).toBe(true);
-  await assertPostFederation(alphaPost.post_view, removePostRes.post_view);
+  await assertPostFederation(
+    alphaPost.post_view,
+    removePostRes.post_view,
+    false,
+  );
 
   // Undelete
   let undeletedPost = await removePost(beta, false, betaPost.post);
diff --git a/api_tests/src/user.spec.ts b/api_tests/src/user.spec.ts
index d008dcdc3d..2edcf54ea8 100644
--- a/api_tests/src/user.spec.ts
+++ b/api_tests/src/user.spec.ts
@@ -131,7 +131,11 @@ test("Requests with invalid auth should be treated as unauthenticated", async ()
 });
 
 test("Create user with Arabic name", async () => {
-  let user = await registerUser(alpha, alphaUrl, "تجريب");
+  let user = await registerUser(
+    alpha,
+    alphaUrl,
+    "تجريب" + Math.random().toString().slice(2, 10), // less than actor_name_max_length
+  );
 
   let site = await getSite(user);
   expect(site.my_user).toBeDefined();
diff --git a/config/defaults.hjson b/config/defaults.hjson
index e8a70ebae2..4bce48b5f8 100644
--- a/config/defaults.hjson
+++ b/config/defaults.hjson
@@ -108,10 +108,12 @@
   port: 8536
   # Whether the site is available over TLS. Needs to be true for federation to work.
   tls_enabled: true
-  # The number of activitypub federation workers that can be in-flight concurrently
-  worker_count: 0
-  # The number of activitypub federation retry workers that can be in-flight concurrently
-  retry_count: 0
+  federation: {
+    # Limit to the number of concurrent outgoing federation requests per target instance.
+    # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
+    # per second) and if a receiving instance is not keeping up.
+    concurrent_sends_per_instance: 1
+  }
   prometheus: {
     bind: "127.0.0.1"
     port: 10002
diff --git a/crates/api_common/src/context.rs b/crates/api_common/src/context.rs
index f4ac41db1f..334983b209 100644
--- a/crates/api_common/src/context.rs
+++ b/crates/api_common/src/context.rs
@@ -55,7 +55,7 @@ impl LemmyContext {
   /// Initialize a context for use in tests which blocks federation network calls.
   ///
   /// Do not use this in production code.
-  pub async fn init_test_context() -> Data<LemmyContext> {
+  pub async fn init_test_federation_config() -> FederationConfig<LemmyContext> {
     // call this to run migrations
     let pool = build_db_pool_for_tests().await;
 
@@ -70,14 +70,19 @@ impl LemmyContext {
     let rate_limit_cell = RateLimitCell::with_test_config();
 
     let context = LemmyContext::create(pool, client, secret, rate_limit_cell.clone());
-    let config = FederationConfig::builder()
+
+    FederationConfig::builder()
       .domain(context.settings().hostname.clone())
       .app_data(context)
+      .debug(true)
       // Dont allow any network fetches
       .http_fetch_limit(0)
       .build()
       .await
-      .expect("build federation config");
+      .expect("build federation config")
+  }
+  pub async fn init_test_context() -> Data<LemmyContext> {
+    let config = Self::init_test_federation_config().await;
     config.to_request_data()
   }
 }
diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs
index c5c9e8e846..9aeaa52663 100644
--- a/crates/db_schema/src/newtypes.rs
+++ b/crates/db_schema/src/newtypes.rs
@@ -107,7 +107,7 @@ pub struct PrivateMessageReportId(i32);
 #[cfg_attr(feature = "full", derive(DieselNewType, TS))]
 #[cfg_attr(feature = "full", ts(export))]
 /// The site id.
-pub struct SiteId(i32);
+pub struct SiteId(pub i32);
 
 #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
 #[cfg_attr(feature = "full", derive(DieselNewType, TS))]
diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml
index 2405d3af0e..b8b4389014 100644
--- a/crates/federate/Cargo.toml
+++ b/crates/federate/Cargo.toml
@@ -34,6 +34,13 @@ tokio = { workspace = true, features = ["full"] }
 tracing.workspace = true
 moka.workspace = true
 tokio-util = "0.7.11"
+async-trait.workspace = true
 
 [dev-dependencies]
 serial_test = { workspace = true }
+url.workspace = true
+actix-web.workspace = true
+tracing-test = "0.2.5"
+uuid.workspace = true
+test-context = "0.3.0"
+mockall = "0.12.1"
diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs
new file mode 100644
index 0000000000..9869e52700
--- /dev/null
+++ b/crates/federate/src/inboxes.rs
@@ -0,0 +1,572 @@
+use crate::util::LEMMY_TEST_FAST_FEDERATION;
+use anyhow::Result;
+use async_trait::async_trait;
+use chrono::{DateTime, TimeZone, Utc};
+use lemmy_db_schema::{
+  newtypes::{CommunityId, DbUrl, InstanceId},
+  source::{activity::SentActivity, site::Site},
+  utils::{ActualDbPool, DbPool},
+};
+use lemmy_db_views_actor::structs::CommunityFollowerView;
+use once_cell::sync::Lazy;
+use reqwest::Url;
+use std::collections::{HashMap, HashSet};
+
+/// interval with which new additions to community_followers are queried.
+///
+/// The first time some user on an instance follows a specific remote community (or, more precisely:
+/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
+/// the maximum time until the follow actually results in activities from that community id being
+/// sent to that inbox url. This delay currently needs to not be too small because the DB load is
+/// currently fairly high because of the current structure of storing inboxes for every person, not
+/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
+/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
+static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
+  if *LEMMY_TEST_FAST_FEDERATION {
+    chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
+  } else {
+    chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
+  }
+});
+/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance
+/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it
+/// in a timely manner is not too important.
+static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
+  Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
+
+#[async_trait]
+pub trait DataSource: Send + Sync {
+  async fn read_site_from_instance_id(
+    &self,
+    instance_id: InstanceId,
+  ) -> Result<Option<Site>, diesel::result::Error>;
+  async fn get_instance_followed_community_inboxes(
+    &self,
+    instance_id: InstanceId,
+    last_fetch: DateTime<Utc>,
+  ) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>;
+}
+pub struct DbDataSource {
+  pool: ActualDbPool,
+}
+
+impl DbDataSource {
+  pub fn new(pool: ActualDbPool) -> Self {
+    Self { pool }
+  }
+}
+
+#[async_trait]
+impl DataSource for DbDataSource {
+  async fn read_site_from_instance_id(
+    &self,
+    instance_id: InstanceId,
+  ) -> Result<Option<Site>, diesel::result::Error> {
+    Site::read_from_instance_id(&mut DbPool::Pool(&self.pool), instance_id).await
+  }
+
+  async fn get_instance_followed_community_inboxes(
+    &self,
+    instance_id: InstanceId,
+    last_fetch: DateTime<Utc>,
+  ) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error> {
+    CommunityFollowerView::get_instance_followed_community_inboxes(
+      &mut DbPool::Pool(&self.pool),
+      instance_id,
+      last_fetch,
+    )
+    .await
+  }
+}
+
+pub(crate) struct CommunityInboxCollector<T: DataSource> {
+  // load site lazily because if an instance is first seen due to being on allowlist,
+  // the corresponding row in `site` may not exist yet since that is only added once
+  // `fetch_instance_actor_for_object` is called.
+  // (this should be unlikely to be relevant outside of the federation tests)
+  site_loaded: bool,
+  site: Option<Site>,
+  followed_communities: HashMap<CommunityId, HashSet<Url>>,
+  last_full_communities_fetch: DateTime<Utc>,
+  last_incremental_communities_fetch: DateTime<Utc>,
+  instance_id: InstanceId,
+  domain: String,
+  pub(crate) data_source: T,
+}
+
+pub type RealCommunityInboxCollector = CommunityInboxCollector<DbDataSource>;
+
+impl<T: DataSource> CommunityInboxCollector<T> {
+  pub fn new_real(
+    pool: ActualDbPool,
+    instance_id: InstanceId,
+    domain: String,
+  ) -> RealCommunityInboxCollector {
+    CommunityInboxCollector::new(DbDataSource::new(pool), instance_id, domain)
+  }
+  pub fn new(
+    data_source: T,
+    instance_id: InstanceId,
+    domain: String,
+  ) -> CommunityInboxCollector<T> {
+    CommunityInboxCollector {
+      data_source,
+      site_loaded: false,
+      site: None,
+      followed_communities: HashMap::new(),
+      last_full_communities_fetch: Utc.timestamp_nanos(0),
+      last_incremental_communities_fetch: Utc.timestamp_nanos(0),
+      instance_id,
+      domain,
+    }
+  }
+  /// get inbox urls of sending the given activity to the given instance
+  /// most often this will return 0 values (if instance doesn't care about the activity)
+  /// or 1 value (the shared inbox)
+  /// > 1 values only happens for non-lemmy software
+  pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result<Vec<Url>> {
+    let mut inbox_urls: HashSet<Url> = HashSet::new();
+
+    if activity.send_all_instances {
+      if !self.site_loaded {
+        self.site = self
+          .data_source
+          .read_site_from_instance_id(self.instance_id)
+          .await?;
+        self.site_loaded = true;
+      }
+      if let Some(site) = &self.site {
+        // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these
+        // activities. So handling it like this is fine.
+        inbox_urls.insert(site.inbox_url.inner().clone());
+      }
+    }
+    if let Some(t) = &activity.send_community_followers_of {
+      if let Some(urls) = self.followed_communities.get(t) {
+        inbox_urls.extend(urls.iter().cloned());
+      }
+    }
+    inbox_urls.extend(
+      activity
+        .send_inboxes
+        .iter()
+        .filter_map(std::option::Option::as_ref)
+        // a similar filter also happens within the activitypub-federation crate. but that filter
+        // happens much later - by doing it here, we can ensure that in the happy case, this
+        // function returns 0 urls which means the system doesn't have to create a tokio
+        // task for sending at all (since that task has a fair amount of overhead)
+        .filter(|&u| (u.domain() == Some(&self.domain)))
+        .map(|u| u.inner().clone()),
+    );
+    tracing::trace!(
+      "get_inbox_urls: {:?}, send_inboxes: {:?}",
+      inbox_urls,
+      activity.send_inboxes
+    );
+    Ok(inbox_urls.into_iter().collect())
+  }
+
+  pub async fn update_communities(&mut self) -> Result<()> {
+    if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
+      tracing::debug!("{}: fetching full list of communities", self.domain);
+      // process removals every hour
+      (self.followed_communities, self.last_full_communities_fetch) = self
+        .get_communities(self.instance_id, Utc.timestamp_nanos(0))
+        .await?;
+      self.last_incremental_communities_fetch = self.last_full_communities_fetch;
+    }
+    if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY {
+      // process additions every minute
+      let (news, time) = self
+        .get_communities(self.instance_id, self.last_incremental_communities_fetch)
+        .await?;
+      if !news.is_empty() {
+        tracing::debug!(
+          "{}: fetched {} incremental new followed communities",
+          self.domain,
+          news.len()
+        );
+      }
+      self.followed_communities.extend(news);
+      self.last_incremental_communities_fetch = time;
+    }
+    Ok(())
+  }
+
+  /// get a list of local communities with the remote inboxes on the given instance that cares about
+  /// them
+  async fn get_communities(
+    &mut self,
+    instance_id: InstanceId,
+    last_fetch: DateTime<Utc>,
+  ) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
+    // update to time before fetch to ensure overlap. subtract some time to ensure overlap even if
+    // published date is not exact
+    let new_last_fetch = Utc::now() - *FOLLOW_ADDITIONS_RECHECK_DELAY / 2;
+
+    let inboxes = self
+      .data_source
+      .get_instance_followed_community_inboxes(instance_id, last_fetch)
+      .await?;
+
+    let map: HashMap<CommunityId, HashSet<Url>> =
+      inboxes.into_iter().fold(HashMap::new(), |mut map, (c, u)| {
+        map.entry(c).or_default().insert(u.into());
+        map
+      });
+
+    Ok((map, new_last_fetch))
+  }
+}
+
+#[cfg(test)]
+#[allow(clippy::unwrap_used)]
+#[allow(clippy::indexing_slicing)]
+mod tests {
+  use super::*;
+  use lemmy_db_schema::{
+    newtypes::{ActivityId, CommunityId, InstanceId, SiteId},
+    source::activity::{ActorType, SentActivity},
+  };
+  use mockall::{mock, predicate::*};
+  use serde_json::json;
+  mock! {
+      DataSource {}
+      #[async_trait]
+      impl DataSource for DataSource {
+          async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> Result<Option<Site>, diesel::result::Error>;
+          async fn get_instance_followed_community_inboxes(
+              &self,
+              instance_id: InstanceId,
+              last_fetch: DateTime<Utc>,
+          ) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>;
+      }
+  }
+
+  fn setup_collector() -> CommunityInboxCollector<MockDataSource> {
+    let mock_data_source = MockDataSource::new();
+    let instance_id = InstanceId(1);
+    let domain = "example.com".to_string();
+    CommunityInboxCollector::new(mock_data_source, instance_id, domain)
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_empty() {
+    let mut collector = setup_collector();
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![],
+      send_community_followers_of: None,
+      send_all_instances: false,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert!(result.is_empty());
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_send_all_instances() {
+    let mut collector = setup_collector();
+    let site_inbox = Url::parse("https://example.com/inbox").unwrap();
+    let site = Site {
+      id: SiteId(1),
+      name: "Test Site".to_string(),
+      sidebar: None,
+      published: Utc::now(),
+      updated: None,
+      icon: None,
+      banner: None,
+      description: None,
+      actor_id: Url::parse("https://example.com/site").unwrap().into(),
+      last_refreshed_at: Utc::now(),
+      inbox_url: site_inbox.clone().into(),
+      private_key: None,
+      public_key: "test_key".to_string(),
+      instance_id: InstanceId(1),
+      content_warning: None,
+    };
+
+    collector
+      .data_source
+      .expect_read_site_from_instance_id()
+      .return_once(move |_| Ok(Some(site)));
+
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![],
+      send_community_followers_of: None,
+      send_all_instances: true,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert_eq!(result.len(), 1);
+    assert_eq!(result[0], site_inbox);
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_community_followers() {
+    let mut collector = setup_collector();
+    let community_id = CommunityId(1);
+    let url1 = "https://follower1.example.com/inbox";
+    let url2 = "https://follower2.example.com/inbox";
+
+    collector
+      .data_source
+      .expect_get_instance_followed_community_inboxes()
+      .return_once(move |_, _| {
+        Ok(vec![
+          (community_id, Url::parse(url1).unwrap().into()),
+          (community_id, Url::parse(url2).unwrap().into()),
+        ])
+      });
+
+    collector.update_communities().await.unwrap();
+
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![],
+      send_community_followers_of: Some(community_id),
+      send_all_instances: false,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert_eq!(result.len(), 2);
+    assert!(result.contains(&Url::parse(url1).unwrap()));
+    assert!(result.contains(&Url::parse(url2).unwrap()));
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_send_inboxes() {
+    let mut collector = setup_collector();
+    collector.domain = "example.com".to_string();
+    let inbox_user_1 = Url::parse("https://example.com/user1/inbox").unwrap();
+    let inbox_user_2 = Url::parse("https://example.com/user2/inbox").unwrap();
+    let other_domain_inbox = Url::parse("https://other-domain.com/user3/inbox").unwrap();
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![
+        Some(inbox_user_1.clone().into()),
+        Some(inbox_user_2.clone().into()),
+        Some(other_domain_inbox.clone().into()),
+      ],
+      send_community_followers_of: None,
+      send_all_instances: false,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert_eq!(result.len(), 2);
+    assert!(result.contains(&inbox_user_1));
+    assert!(result.contains(&inbox_user_2));
+    assert!(!result.contains(&other_domain_inbox));
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_combined() {
+    let mut collector = setup_collector();
+    collector.domain = "example.com".to_string();
+    let community_id = CommunityId(1);
+
+    let site_inbox = Url::parse("https://example.com/site_inbox").unwrap();
+    let site = Site {
+      id: SiteId(1),
+      name: "Test Site".to_string(),
+      sidebar: None,
+      published: Utc::now(),
+      updated: None,
+      icon: None,
+      banner: None,
+      description: None,
+      actor_id: Url::parse("https://example.com/site").unwrap().into(),
+      last_refreshed_at: Utc::now(),
+      inbox_url: site_inbox.clone().into(),
+      private_key: None,
+      public_key: "test_key".to_string(),
+      instance_id: InstanceId(1),
+      content_warning: None,
+    };
+
+    collector
+      .data_source
+      .expect_read_site_from_instance_id()
+      .return_once(move |_| Ok(Some(site)));
+
+    let subdomain_inbox = "https://follower.example.com/inbox";
+    collector
+      .data_source
+      .expect_get_instance_followed_community_inboxes()
+      .return_once(move |_, _| {
+        Ok(vec![(
+          community_id,
+          Url::parse(subdomain_inbox).unwrap().into(),
+        )])
+      });
+
+    collector.update_communities().await.unwrap();
+    let user1_inbox = Url::parse("https://example.com/user1/inbox").unwrap();
+    let user2_inbox = Url::parse("https://other-domain.com/user2/inbox").unwrap();
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![
+        Some(user1_inbox.clone().into()),
+        Some(user2_inbox.clone().into()),
+      ],
+      send_community_followers_of: Some(community_id),
+      send_all_instances: true,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert_eq!(result.len(), 3);
+    assert!(result.contains(&site_inbox));
+    assert!(result.contains(&Url::parse(subdomain_inbox).unwrap()));
+    assert!(result.contains(&user1_inbox));
+    assert!(!result.contains(&user2_inbox));
+  }
+
+  #[tokio::test]
+  async fn test_update_communities() {
+    let mut collector = setup_collector();
+    let community_id1 = CommunityId(1);
+    let community_id2 = CommunityId(2);
+    let community_id3 = CommunityId(3);
+
+    let user1_inbox_str = "https://follower1.example.com/inbox";
+    let user1_inbox = Url::parse(user1_inbox_str).unwrap();
+    let user2_inbox_str = "https://follower2.example.com/inbox";
+    let user2_inbox = Url::parse(user2_inbox_str).unwrap();
+    let user3_inbox_str = "https://follower3.example.com/inbox";
+    let user3_inbox = Url::parse(user3_inbox_str).unwrap();
+
+    collector
+      .data_source
+      .expect_get_instance_followed_community_inboxes()
+      .times(2)
+      .returning(move |_, last_fetch| {
+        if last_fetch == Utc.timestamp_nanos(0) {
+          Ok(vec![
+            (community_id1, Url::parse(user1_inbox_str).unwrap().into()),
+            (community_id2, Url::parse(user2_inbox_str).unwrap().into()),
+          ])
+        } else {
+          Ok(vec![(
+            community_id3,
+            Url::parse(user3_inbox_str).unwrap().into(),
+          )])
+        }
+      });
+
+    // First update
+    collector.update_communities().await.unwrap();
+    assert_eq!(collector.followed_communities.len(), 2);
+    assert!(collector.followed_communities[&community_id1].contains(&user1_inbox));
+    assert!(collector.followed_communities[&community_id2].contains(&user2_inbox));
+
+    // Simulate time passing
+    collector.last_full_communities_fetch = Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap();
+    collector.last_incremental_communities_fetch =
+      Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap();
+
+    // Second update (incremental)
+    collector.update_communities().await.unwrap();
+    assert_eq!(collector.followed_communities.len(), 3);
+    assert!(collector.followed_communities[&community_id1].contains(&user1_inbox));
+    assert!(collector.followed_communities[&community_id3].contains(&user3_inbox));
+    assert!(collector.followed_communities[&community_id2].contains(&user2_inbox));
+  }
+
+  #[tokio::test]
+  async fn test_get_inbox_urls_no_duplicates() {
+    let mut collector = setup_collector();
+    collector.domain = "example.com".to_string();
+    let community_id = CommunityId(1);
+    let site_inbox = Url::parse("https://example.com/site_inbox").unwrap();
+    let site_inbox_clone = site_inbox.clone();
+    let site = Site {
+      id: SiteId(1),
+      name: "Test Site".to_string(),
+      sidebar: None,
+      published: Utc::now(),
+      updated: None,
+      icon: None,
+      banner: None,
+      description: None,
+      actor_id: Url::parse("https://example.com/site").unwrap().into(),
+      last_refreshed_at: Utc::now(),
+      inbox_url: site_inbox.clone().into(),
+      private_key: None,
+      public_key: "test_key".to_string(),
+      instance_id: InstanceId(1),
+      content_warning: None,
+    };
+
+    collector
+      .data_source
+      .expect_read_site_from_instance_id()
+      .return_once(move |_| Ok(Some(site)));
+
+    collector
+      .data_source
+      .expect_get_instance_followed_community_inboxes()
+      .return_once(move |_, _| Ok(vec![(community_id, site_inbox_clone.into())]));
+
+    collector.update_communities().await.unwrap();
+
+    let activity = SentActivity {
+      id: ActivityId(1),
+      ap_id: Url::parse("https://example.com/activities/1")
+        .unwrap()
+        .into(),
+      data: json!({}),
+      sensitive: false,
+      published: Utc::now(),
+      send_inboxes: vec![Some(site_inbox.into())],
+      send_community_followers_of: Some(community_id),
+      send_all_instances: true,
+      actor_type: ActorType::Person,
+      actor_apub_id: None,
+    };
+
+    let result = collector.get_inbox_urls(&activity).await.unwrap();
+    assert_eq!(result.len(), 1);
+    assert!(result.contains(&Url::parse("https://example.com/site_inbox").unwrap()));
+  }
+}
diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs
index 21b9229b59..66c0a28727 100644
--- a/crates/federate/src/lib.rs
+++ b/crates/federate/src/lib.rs
@@ -1,6 +1,9 @@
 use crate::{util::CancellableTask, worker::InstanceWorker};
 use activitypub_federation::config::FederationConfig;
-use lemmy_api_common::context::LemmyContext;
+use lemmy_api_common::{
+  context::LemmyContext,
+  lemmy_utils::settings::structs::FederationWorkerConfig,
+};
 use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance};
 use lemmy_utils::error::LemmyResult;
 use stats::receive_print_stats;
@@ -14,6 +17,8 @@ use tokio_util::sync::CancellationToken;
 use tracing::info;
 use util::FederationQueueStateWithDomain;
 
+mod inboxes;
+mod send;
 mod stats;
 mod util;
 mod worker;
@@ -38,10 +43,15 @@ pub struct SendManager {
   context: FederationConfig<LemmyContext>,
   stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
   exit_print: JoinHandle<()>,
+  federation_worker_config: FederationWorkerConfig,
 }
 
 impl SendManager {
-  fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
+  fn new(
+    opts: Opts,
+    context: FederationConfig<LemmyContext>,
+    federation_worker_config: FederationWorkerConfig,
+  ) -> Self {
     assert!(opts.process_count > 0);
     assert!(opts.process_index > 0);
     assert!(opts.process_index <= opts.process_count);
@@ -56,14 +66,20 @@ impl SendManager {
         stats_receiver,
       )),
       context,
+      federation_worker_config,
     }
   }
 
-  pub fn run(opts: Opts, context: FederationConfig<LemmyContext>) -> CancellableTask {
+  pub fn run(
+    opts: Opts,
+    context: FederationConfig<LemmyContext>,
+    config: FederationWorkerConfig,
+  ) -> CancellableTask {
     CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| {
       let opts = opts.clone();
+      let config = config.clone();
       let context = context.clone();
-      let mut manager = Self::new(opts, context);
+      let mut manager = Self::new(opts, context, config);
       async move {
         let result = manager.do_loop(cancel).await;
         // the loop function will only return if there is (a) an internal error (e.g. db connection
@@ -120,22 +136,21 @@ impl SendManager {
           // create new worker
           let context = self.context.clone();
           let stats_sender = self.stats_sender.clone();
+          let federation_worker_config = self.federation_worker_config.clone();
+
           self.workers.insert(
             instance.id,
             CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
-              // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask.
+              // if the instance worker ends unexpectedly due to internal/db errors, this lambda is
+              // rerun by cancellabletask.
               let instance = instance.clone();
-              let req_data = context.to_request_data();
-              let stats_sender = stats_sender.clone();
-              async move {
-                InstanceWorker::init_and_loop(
-                  instance,
-                  req_data,
-                  stop,
-                  stats_sender,
-                )
-                .await
-              }
+              InstanceWorker::init_and_loop(
+                instance,
+                context.clone(),
+                federation_worker_config.clone(),
+                stop,
+                stats_sender.clone(),
+              )
             }),
           );
         } else if !should_federate {
@@ -214,7 +229,14 @@ mod test {
         .app_data(context.clone())
         .build()
         .await?;
+      let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS")
+        .ok()
+        .and_then(|s| s.parse().ok())
+        .unwrap_or(1);
 
+      let federation_worker_config = FederationWorkerConfig {
+        concurrent_sends_per_instance,
+      };
       let pool = &mut context.pool();
       let instances = vec![
         Instance::read_or_create(pool, "alpha.com".to_string()).await?,
@@ -222,7 +244,7 @@ mod test {
         Instance::read_or_create(pool, "gamma.com".to_string()).await?,
       ];
 
-      let send_manager = SendManager::new(opts, federation_config);
+      let send_manager = SendManager::new(opts, federation_config, federation_worker_config);
       Ok(Self {
         send_manager,
         context,
diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs
new file mode 100644
index 0000000000..01d620eb0d
--- /dev/null
+++ b/crates/federate/src/send.rs
@@ -0,0 +1,148 @@
+use crate::util::get_actor_cached;
+use activitypub_federation::{
+  activity_sending::SendActivityTask,
+  config::Data,
+  protocol::context::WithContext,
+};
+use anyhow::{Context, Result};
+use chrono::{DateTime, Utc};
+use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
+use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
+use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity};
+use reqwest::Url;
+use std::ops::Deref;
+use tokio::{sync::mpsc::UnboundedSender, time::sleep};
+use tokio_util::sync::CancellationToken;
+
+#[derive(Debug, Eq)]
+pub(crate) struct SendSuccessInfo {
+  pub activity_id: ActivityId,
+  pub published: Option<DateTime<Utc>>,
+  // true if the activity was skipped because the target instance is not interested in this
+  // activity
+  pub was_skipped: bool,
+}
+impl PartialEq for SendSuccessInfo {
+  fn eq(&self, other: &Self) -> bool {
+    self.activity_id == other.activity_id
+  }
+}
+/// order backwards because the binary heap is a max heap, and we need the smallest element to be on
+/// top
+impl PartialOrd for SendSuccessInfo {
+  fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+    Some(self.cmp(other))
+  }
+}
+impl Ord for SendSuccessInfo {
+  fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+    other.activity_id.cmp(&self.activity_id)
+  }
+}
+
+/// Represents the result of sending an activity.
+///
+/// This enum is used to communicate the outcome of a send operation from a send task
+/// to the main instance worker. It's designed to maintain a clean separation between
+/// the send task and the main thread, allowing the send.rs file to be self-contained
+/// and easier to understand.
+///
+/// The use of a channel for communication (rather than shared atomic variables) was chosen
+/// because:
+/// 1. It keeps the send task cleanly separated with no direct interaction with the main thread.
+/// 2. The failure event needs to be transferred to the main task for database updates anyway.
+/// 3. The main fail_count should only be updated under certain conditions, which are best handled
+///    in the main task.
+/// 4. It maintains consistency in how data is communicated (all via channels rather than a mix of
+///    channels and atomics).
+/// 5. It simplifies concurrency management and makes the flow of data more predictable.
+pub(crate) enum SendActivityResult {
+  Success(SendSuccessInfo),
+  Failure { fail_count: i32 },
+}
+/// Represents a task for retrying to send an activity.
+///
+/// This struct encapsulates all the necessary information and resources for attempting
+/// to send an activity to multiple inbox URLs, with built-in retry logic.
+pub(crate) struct SendRetryTask<'a> {
+  pub activity: &'a SentActivity,
+  pub object: &'a SharedInboxActivities,
+  /// Must not be empty at this point
+  pub inbox_urls: Vec<Url>,
+  /// Channel to report results back to the main instance worker
+  pub report: &'a mut UnboundedSender<SendActivityResult>,
+  /// The first request will be sent immediately, but subsequent requests will be delayed
+  /// according to the number of previous fails + 1
+  ///
+  /// This is a read-only immutable variable that is passed only one way, from the main
+  /// thread to each send task. It allows the task to determine how long to sleep initially
+  /// if the request fails.
+  pub initial_fail_count: i32,
+  /// For logging purposes
+  pub domain: String,
+  pub context: Data<LemmyContext>,
+  pub stop: CancellationToken,
+}
+
+impl<'a> SendRetryTask<'a> {
+  // this function will return successfully when (a) send succeeded or (b) worker cancelled
+  // and will return an error if an internal error occurred (send errors cause an infinite loop)
+  pub async fn send_retry_loop(self) -> Result<()> {
+    let SendRetryTask {
+      activity,
+      object,
+      inbox_urls,
+      report,
+      initial_fail_count,
+      domain,
+      context,
+      stop,
+    } = self;
+    debug_assert!(!inbox_urls.is_empty());
+
+    let pool = &mut context.pool();
+    let Some(actor_apub_id) = &activity.actor_apub_id else {
+      return Err(anyhow::anyhow!("activity is from before lemmy 0.19"));
+    };
+    let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id)
+      .await
+      .context("failed getting actor instance (was it marked deleted / removed?)")?;
+
+    let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone());
+    let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?;
+    for task in requests {
+      // usually only one due to shared inbox
+      tracing::debug!("sending out {}", task);
+      let mut fail_count = initial_fail_count;
+      while let Err(e) = task.sign_and_send(&context).await {
+        fail_count += 1;
+        report.send(SendActivityResult::Failure {
+          fail_count,
+          // activity_id: activity.id,
+        })?;
+        let retry_delay = federate_retry_sleep_duration(fail_count);
+        tracing::info!(
+          "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
+          domain,
+          activity.id,
+          fail_count
+        );
+        tokio::select! {
+          () = sleep(retry_delay) => {},
+          () = stop.cancelled() => {
+            // cancel sending without reporting any result.
+            // the InstanceWorker needs to be careful to not hang on receive of that
+            // channel when cancelled (see handle_send_results)
+            return Ok(());
+          }
+        }
+      }
+    }
+    report.send(SendActivityResult::Success(SendSuccessInfo {
+      activity_id: activity.id,
+      published: Some(activity.published),
+      was_skipped: false,
+    }))?;
+    Ok(())
+  }
+}
diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs
index 60361c3c92..afbe957a5d 100644
--- a/crates/federate/src/util.rs
+++ b/crates/federate/src/util.rs
@@ -1,7 +1,6 @@
 use anyhow::{anyhow, Context, Result};
 use diesel::prelude::*;
 use diesel_async::RunQueryDsl;
-use lemmy_api_common::lemmy_utils::CACHE_DURATION_FEDERATION;
 use lemmy_apub::{
   activity_lists::SharedInboxActivities,
   fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
@@ -28,19 +27,28 @@ use tokio_util::sync::CancellationToken;
 
 /// Decrease the delays of the federation queue.
 /// Should only be used for federation tests since it significantly increases CPU and DB load of the
-/// federation queue.
+/// federation queue. This is intentionally a separate flag from other flags like debug_assertions,
+/// since this is a invasive change we only need rarely.
 pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
   std::env::var("LEMMY_TEST_FAST_FEDERATION")
     .map(|s| !s.is_empty())
     .unwrap_or(false)
 });
 
-/// Recheck for new federation work every n seconds.
+/// Recheck for new federation work every n seconds within each InstanceWorker.
 ///
 /// When the queue is processed faster than new activities are added and it reaches the current time
 /// with an empty batch, this is the delay the queue waits before it checks if new activities have
 /// been added to the sent_activities table. This delay is only applied if no federated activity
-/// happens during sending activities of the last batch.
+/// happens during sending activities of the last batch, which means on high-activity instances it
+/// may never be used. This means that it does not affect the maximum throughput of the queue.
+///
+///
+/// This is thus the interval with which tokio wakes up each of the
+/// InstanceWorkers to check for new work, if the queue previously was empty.
+/// If the delay is too short, the workers (one per federated instance) will wake up too
+/// often and consume a lot of CPU. If the delay is long, then activities on low-traffic instances
+/// will on average take delay/2 seconds to federate.
 pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
   if *LEMMY_TEST_FAST_FEDERATION {
     Duration::from_millis(100)
@@ -49,6 +57,21 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
   }
 });
 
+/// Cache the latest activity id for a certain duration.
+///
+/// This cache is common to all the instance workers and prevents there from being more than one
+/// call per N seconds between each DB query to find max(activity_id).
+pub(crate) static CACHE_DURATION_LATEST_ID: Lazy<Duration> = Lazy::new(|| {
+  if *LEMMY_TEST_FAST_FEDERATION {
+    // in test mode, we use the same cache duration as the recheck delay so when recheck happens
+    // data is fresh, accelerating the time the tests take.
+    *WORK_FINISHED_RECHECK_DELAY
+  } else {
+    // in normal mode, we limit the query to one per second
+    Duration::from_secs(1)
+  }
+});
+
 /// A task that will be run in an infinite loop, unless it is cancelled.
 /// If the task exits without being cancelled, an error will be logged and the task will be
 /// restarted.
@@ -174,7 +197,7 @@ pub(crate) async fn get_activity_cached(
 pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
   static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
     Cache::builder()
-      .time_to_live(CACHE_DURATION_FEDERATION)
+      .time_to_live(*CACHE_DURATION_LATEST_ID)
       .build()
   });
   CACHE
diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs
index 25c9278aa4..28f21dcc28 100644
--- a/crates/federate/src/worker.rs
+++ b/crates/federate/src/worker.rs
@@ -1,132 +1,192 @@
-use crate::util::{
-  get_activity_cached,
-  get_actor_cached,
-  get_latest_activity_id,
-  FederationQueueStateWithDomain,
-  LEMMY_TEST_FAST_FEDERATION,
-  WORK_FINISHED_RECHECK_DELAY,
-};
-use activitypub_federation::{
-  activity_sending::SendActivityTask,
-  config::Data,
-  protocol::context::WithContext,
+use crate::{
+  inboxes::RealCommunityInboxCollector,
+  send::{SendActivityResult, SendRetryTask, SendSuccessInfo},
+  util::{
+    get_activity_cached,
+    get_latest_activity_id,
+    FederationQueueStateWithDomain,
+    WORK_FINISHED_RECHECK_DELAY,
+  },
 };
+use activitypub_federation::config::FederationConfig;
 use anyhow::{Context, Result};
 use chrono::{DateTime, Days, TimeZone, Utc};
-use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
-use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
+use lemmy_api_common::{
+  context::LemmyContext,
+  federate_retry_sleep_duration,
+  lemmy_utils::settings::structs::FederationWorkerConfig,
+};
 use lemmy_db_schema::{
-  newtypes::{ActivityId, CommunityId, InstanceId},
+  newtypes::ActivityId,
   source::{
-    activity::SentActivity,
     federation_queue_state::FederationQueueState,
     instance::{Instance, InstanceForm},
-    site::Site,
   },
-  utils::naive_now,
+  utils::{naive_now, ActualDbPool, DbPool},
 };
-use lemmy_db_views_actor::structs::CommunityFollowerView;
-use once_cell::sync::Lazy;
-use reqwest::Url;
-use std::{
-  collections::{HashMap, HashSet},
-  ops::{Add, Deref},
-  time::Duration,
+use std::{collections::BinaryHeap, ops::Add, time::Duration};
+use tokio::{
+  sync::mpsc::{self, UnboundedSender},
+  time::sleep,
 };
-use tokio::{sync::mpsc::UnboundedSender, time::sleep};
 use tokio_util::sync::CancellationToken;
-use tracing::{debug, info, trace, warn};
 
-/// Check whether to save state to db every n sends if there's no failures (during failures state is
-/// saved after every attempt). This determines the batch size for loop_batch. After a batch ends
-/// and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
-static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
 /// Save state to db after this time has passed since the last state (so if the server crashes or is
 /// SIGKILLed, less than X seconds of activities are resent)
+#[cfg(not(test))]
 static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
-/// interval with which new additions to community_followers are queried.
+#[cfg(test)]
+/// in test mode, we want it to save state and send it to print_stats after every send
+static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(0);
+/// Maximum number of successful sends to allow out of order
+const MAX_SUCCESSFULS: usize = 1000;
+
+/// in prod mode, try to collect multiple send results at the same time to reduce load
+#[cfg(not(test))]
+static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 4;
+#[cfg(test)]
+static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0;
+
 ///
-/// The first time some user on an instance follows a specific remote community (or, more precisely:
-/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
-/// the maximum time until the follow actually results in activities from that community id being
-/// sent to that inbox url. This delay currently needs to not be too small because the DB load is
-/// currently fairly high because of the current structure of storing inboxes for every person, not
-/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
-/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
-static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
-  if *LEMMY_TEST_FAST_FEDERATION {
-    chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
-  } else {
-    chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
-  }
-});
-/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance
-/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it
-/// in a timely manner is not too important.
-static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
-  Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
+/// SendManager --(has many)--> InstanceWorker --(has many)--> SendRetryTask
+///      |                            |                               |
+/// -----|------create worker -> loop activities--create task-> send activity
+///      |                            |                             vvvv
+///      |                            |                           fail or success
+///      |                            |           <-report result--   |
+///      |               <---order and aggrate results---             |
+///      |   <---send stats---        |                               |
+/// filter and print stats            |                               |
 pub(crate) struct InstanceWorker {
   instance: Instance,
-  // load site lazily because if an instance is first seen due to being on allowlist,
-  // the corresponding row in `site` may not exist yet since that is only added once
-  // `fetch_instance_actor_for_object` is called.
-  // (this should be unlikely to be relevant outside of the federation tests)
-  site_loaded: bool,
-  site: Option<Site>,
-  followed_communities: HashMap<CommunityId, HashSet<Url>>,
   stop: CancellationToken,
-  context: Data<LemmyContext>,
-  stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
-  last_full_communities_fetch: DateTime<Utc>,
-  last_incremental_communities_fetch: DateTime<Utc>,
+  federation_lib_config: FederationConfig<LemmyContext>,
+  federation_worker_config: FederationWorkerConfig,
   state: FederationQueueState,
   last_state_insert: DateTime<Utc>,
+  pool: ActualDbPool,
+  inbox_collector: RealCommunityInboxCollector,
+  // regularily send stats back to the SendManager
+  stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
+  // each HTTP send will report back to this channel concurrently
+  receive_send_result: mpsc::UnboundedReceiver<SendActivityResult>,
+  // this part of the channel is cloned and passed to the SendRetryTasks
+  report_send_result: mpsc::UnboundedSender<SendActivityResult>,
+  // activities that have been successfully sent but
+  // that are not the lowest number and thus can't be written to the database yet
+  successfuls: BinaryHeap<SendSuccessInfo>,
+  // number of activities that currently have a task spawned to send it
+  in_flight: i32,
 }
 
 impl InstanceWorker {
   pub(crate) async fn init_and_loop(
     instance: Instance,
-    context: Data<LemmyContext>,
+    config: FederationConfig<LemmyContext>,
+    federation_worker_config: FederationWorkerConfig,
     stop: CancellationToken,
     stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
   ) -> Result<(), anyhow::Error> {
-    let mut pool = context.pool();
-    let state = FederationQueueState::load(&mut pool, instance.id).await?;
+    let pool = config.to_request_data().inner_pool().clone();
+    let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?;
+    let (report_send_result, receive_send_result) =
+      tokio::sync::mpsc::unbounded_channel::<SendActivityResult>();
     let mut worker = InstanceWorker {
+      inbox_collector: RealCommunityInboxCollector::new_real(
+        pool.clone(),
+        instance.id,
+        instance.domain.clone(),
+      ),
+      federation_worker_config,
       instance,
-      site_loaded: false,
-      site: None,
-      followed_communities: HashMap::new(),
       stop,
-      context,
+      federation_lib_config: config,
       stats_sender,
-      last_full_communities_fetch: Utc.timestamp_nanos(0),
-      last_incremental_communities_fetch: Utc.timestamp_nanos(0),
       state,
       last_state_insert: Utc.timestamp_nanos(0),
+      pool,
+      receive_send_result,
+      report_send_result,
+      successfuls: BinaryHeap::<SendSuccessInfo>::new(),
+      in_flight: 0,
     };
+
     worker.loop_until_stopped().await
   }
   /// loop fetch new activities from db and send them to the inboxes of the given instances
   /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
   /// cancelled (graceful exit)
-  pub(crate) async fn loop_until_stopped(&mut self) -> Result<(), anyhow::Error> {
-    debug!("Starting federation worker for {}", self.instance.domain);
-    let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
-
-    self.update_communities().await?;
+  async fn loop_until_stopped(&mut self) -> Result<()> {
     self.initial_fail_sleep().await?;
+    let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?;
+
     while !self.stop.is_cancelled() {
-      self.loop_batch().await?;
-      if self.stop.is_cancelled() {
-        break;
+      // check if we need to wait for a send to finish before sending the next one
+      // we wait if (a) the last request failed, only if a request is already in flight (not at the
+      // start of the loop) or (b) if we have too many successfuls in memory or (c) if we have
+      // too many in flight
+      let need_wait_for_event = (self.in_flight != 0 && self.state.fail_count > 0)
+        || self.successfuls.len() >= MAX_SUCCESSFULS
+        || i64::from(self.in_flight) >= self.federation_worker_config.concurrent_sends_per_instance;
+      if need_wait_for_event || self.receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE
+      {
+        // if len() > 0 then this does not block and allows us to write to db more often
+        // if len is 0 then this means we wait for something to change our above conditions,
+        // which can only happen by an event sent into the channel
+        self.handle_send_results().await?;
+        // handle_send_results does not guarantee that we are now in a condition where we want to
+        // send a new one, so repeat this check until the if no longer applies
+        continue;
       }
-      if (Utc::now() - self.last_state_insert) > save_state_every {
-        self.save_and_send_state().await?;
+
+      // send a new activity if there is one
+      self.inbox_collector.update_communities().await?;
+      let next_id_to_send = ActivityId(last_sent_id.0 + 1);
+      {
+        // sanity check: calculate next id to send based on the last id and the in flight requests
+        let expected_next_id = self.state.last_successful_id.map(|last_successful_id| {
+          last_successful_id.0 + (self.successfuls.len() as i64) + i64::from(self.in_flight) + 1
+        });
+        // compare to next id based on incrementing
+        if expected_next_id != Some(next_id_to_send.0) {
+          anyhow::bail!(
+            "{}: next id to send is not as expected: {:?} != {:?}",
+            self.instance.domain,
+            expected_next_id,
+            next_id_to_send
+          )
+        }
       }
-      self.update_communities().await?;
+
+      if next_id_to_send > newest_id {
+        // lazily fetch latest id only if we have cought up
+        newest_id = self.get_latest_ids().await?.1;
+        if next_id_to_send > newest_id {
+          if next_id_to_send > ActivityId(newest_id.0 + 1) {
+            tracing::error!(
+                "{}: next send id {} is higher than latest id {}+1 in database (did the db get cleared?)",
+                self.instance.domain,
+                next_id_to_send.0,
+                newest_id.0
+              );
+          }
+          // no more work to be done, wait before rechecking
+          tokio::select! {
+            () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
+            () = self.stop.cancelled() => {
+              tracing::debug!("cancelled worker loop while waiting for new work")
+            }
+          }
+          continue;
+        }
+      }
+      self.in_flight += 1;
+      last_sent_id = next_id_to_send;
+      self.spawn_send_if_needed(next_id_to_send).await?;
     }
-    // final update of state in db
+    tracing::debug!("cancelled worker loop after send");
+
+    // final update of state in db on shutdown
     self.save_and_send_state().await?;
     Ok(())
   }
@@ -144,18 +204,27 @@ impl InstanceWorker {
         return Ok(());
       }
       let remaining = required - elapsed;
+      tracing::debug!(
+        "{}: fail-sleeping for {:?} before starting queue",
+        self.instance.domain,
+        remaining
+      );
       tokio::select! {
         () = sleep(remaining) => {},
-        () = self.stop.cancelled() => {}
+        () = self.stop.cancelled() => {
+          tracing::debug!("cancelled worker loop during initial fail sleep")
+        }
       }
     }
     Ok(())
   }
-  /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
-  async fn loop_batch(&mut self) -> Result<()> {
-    let latest_id = get_latest_activity_id(&mut self.context.pool()).await?;
-    let mut id = if let Some(id) = self.state.last_successful_id {
-      id
+
+  /// return the last successfully sent id and the newest activity id in the database
+  /// sets last_successful_id in database if it's the first time this instance is seen
+  async fn get_latest_ids(&mut self) -> Result<(ActivityId, ActivityId)> {
+    let latest_id = get_latest_activity_id(&mut self.pool()).await?;
+    let last = if let Some(last) = self.state.last_successful_id {
+      last
     } else {
       // this is the initial creation (instance first seen) of the federation queue for this
       // instance
@@ -166,203 +235,542 @@ impl InstanceWorker {
       self.save_and_send_state().await?;
       latest_id
     };
-    if id >= latest_id {
-      if id > latest_id {
-        tracing::error!(
-          "{}: last successful id {} is higher than latest id {} in database (did the db get cleared?)",
-          self.instance.domain,
-          id.0,
-          latest_id.0
-        );
-      }
-      // no more work to be done, wait before rechecking
-      tokio::select! {
-        () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
-        () = self.stop.cancelled() => {}
-      }
-      return Ok(());
-    }
-    let mut processed_activities = 0;
-    while id < latest_id
-      && processed_activities < CHECK_SAVE_STATE_EVERY_IT
-      && !self.stop.is_cancelled()
-    {
-      id = ActivityId(id.0 + 1);
-      processed_activities += 1;
-      let Some(ele) = get_activity_cached(&mut self.context.pool(), id)
-        .await
-        .context("failed reading activity from db")?
-      else {
-        debug!("{}: {:?} does not exist", self.instance.domain, id);
-        self.state.last_successful_id = Some(id);
-        continue;
-      };
-      if let Err(e) = self.send_retry_loop(&ele.0, &ele.1).await {
-        warn!(
-          "sending {} errored internally, skipping activity: {:?}",
-          ele.0.ap_id, e
-        );
-      }
-      if self.stop.is_cancelled() {
+    Ok((last, latest_id))
+  }
+
+  async fn handle_send_results(&mut self) -> Result<(), anyhow::Error> {
+    let mut force_write = false;
+    let mut events = Vec::new();
+    // Wait for at least one event but if there's multiple handle them all.
+    // We need to listen to the cancel event here as well in order to prevent a hang on shutdown:
+    // If the SendRetryTask gets cancelled, it immediately exits without reporting any state.
+    // So if the worker is waiting for a send result and all SendRetryTask gets cancelled, this recv
+    // could hang indefinitely otherwise. The tasks will also drop their handle of
+    // report_send_result which would cause the recv_many method to return 0 elements, but since
+    // InstanceWorker holds a copy of the send result channel as well, that won't happen.
+    tokio::select! {
+      _ = self.receive_send_result.recv_many(&mut events, 1000) => {},
+      () = self.stop.cancelled() => {
+        tracing::debug!("cancelled worker loop while waiting for send results");
         return Ok(());
       }
-      // send success!
-      self.state.last_successful_id = Some(id);
-      self.state.last_successful_published_time = Some(ele.0.published);
-      self.state.fail_count = 0;
+    }
+    for event in events {
+      match event {
+        SendActivityResult::Success(s) => {
+          self.in_flight -= 1;
+          if !s.was_skipped {
+            self.state.fail_count = 0;
+            self.mark_instance_alive().await?;
+          }
+          self.successfuls.push(s);
+        }
+        SendActivityResult::Failure { fail_count, .. } => {
+          if fail_count > self.state.fail_count {
+            // override fail count - if multiple activities are currently sending this value may get
+            // conflicting info but that's fine.
+            // This needs to be this way, all alternatives would be worse. The reason is that if 10
+            // simultaneous requests fail within a 1s period, we don't want the next retry to be
+            // exponentially 2**10 s later. Any amount of failures within a fail-sleep period should
+            // only count as one failure.
+
+            self.state.fail_count = fail_count;
+            self.state.last_retry = Some(Utc::now());
+            force_write = true;
+          }
+        }
+      }
+    }
+    self.pop_successfuls_and_write(force_write).await?;
+    Ok(())
+  }
+  async fn mark_instance_alive(&mut self) -> Result<()> {
+    // Activity send successful, mark instance as alive if it hasn't been updated in a while.
+    let updated = self.instance.updated.unwrap_or(self.instance.published);
+    if updated.add(Days::new(1)) < Utc::now() {
+      self.instance.updated = Some(Utc::now());
+
+      let form = InstanceForm::builder()
+        .domain(self.instance.domain.clone())
+        .updated(Some(naive_now()))
+        .build();
+      Instance::update(&mut self.pool(), self.instance.id, form).await?;
+    }
+    Ok(())
+  }
+  /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have
+  /// been sent successfully. In that case updates `last_successful_id` and saves the state to the
+  /// database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`.
+  async fn pop_successfuls_and_write(&mut self, force_write: bool) -> Result<()> {
+    let Some(mut last_id) = self.state.last_successful_id else {
+      tracing::warn!(
+        "{} should be impossible: last successful id is None",
+        self.instance.domain
+      );
+      return Ok(());
+    };
+    tracing::debug!(
+      "{} last: {:?}, next: {:?}, currently in successfuls: {:?}",
+      self.instance.domain,
+      last_id,
+      self.successfuls.peek(),
+      self.successfuls.iter()
+    );
+    while self
+      .successfuls
+      .peek()
+      .map(|a| a.activity_id == ActivityId(last_id.0 + 1))
+      .unwrap_or(false)
+    {
+      let next = self
+        .successfuls
+        .pop()
+        .context("peek above ensures pop has value")?;
+      last_id = next.activity_id;
+      self.state.last_successful_id = Some(next.activity_id);
+      self.state.last_successful_published_time = next.published;
+    }
+
+    let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
+    if force_write || (Utc::now() - self.last_state_insert) > save_state_every {
+      self.save_and_send_state().await?;
     }
     Ok(())
   }
 
-  // this function will return successfully when (a) send succeeded or (b) worker cancelled
-  // and will return an error if an internal error occurred (send errors cause an infinite loop)
-  async fn send_retry_loop(
-    &mut self,
-    activity: &SentActivity,
-    object: &SharedInboxActivities,
-  ) -> Result<()> {
+  /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task
+  /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many)
+  /// cases where we don't have any inboxes
+  async fn spawn_send_if_needed(&mut self, activity_id: ActivityId) -> Result<()> {
+    let Some(ele) = get_activity_cached(&mut self.pool(), activity_id)
+      .await
+      .context("failed reading activity from db")?
+    else {
+      tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id);
+      self
+        .report_send_result
+        .send(SendActivityResult::Success(SendSuccessInfo {
+          activity_id,
+          published: None,
+          was_skipped: true,
+        }))?;
+      return Ok(());
+    };
+    let activity = &ele.0;
     let inbox_urls = self
+      .inbox_collector
       .get_inbox_urls(activity)
       .await
       .context("failed figuring out inbox urls")?;
     if inbox_urls.is_empty() {
-      trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
-      self.state.last_successful_id = Some(activity.id);
-      self.state.last_successful_published_time = Some(activity.published);
+      // this is the case when the activity is not relevant to this receiving instance (e.g. no user
+      // subscribed to the relevant community)
+      tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id);
+      self
+        .report_send_result
+        .send(SendActivityResult::Success(SendSuccessInfo {
+          activity_id,
+          // it would be valid here to either return None or Some(activity.published). The published
+          // time is only used for stats pages that track federation delay. None can be a bit
+          // misleading because if you look at / chart the published time for federation from a
+          // large to a small instance that's only subscribed to a few small communities,
+          // then it will show the last published time as a days ago even though
+          // federation is up to date.
+          published: Some(activity.published),
+          was_skipped: true,
+        }))?;
       return Ok(());
     }
-    let Some(actor_apub_id) = &activity.actor_apub_id else {
-      return Ok(()); // activity was inserted before persistent queue was activated
-    };
-    let actor = get_actor_cached(&mut self.context.pool(), activity.actor_type, actor_apub_id)
-      .await
-      .context("failed getting actor instance (was it marked deleted / removed?)")?;
-
-    let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone());
-    let inbox_urls = inbox_urls.into_iter().collect();
-    let requests =
-      SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?;
-    for task in requests {
-      // usually only one due to shared inbox
-      trace!("sending out {}", task);
-      while let Err(e) = task.sign_and_send(&self.context).await {
-        self.state.fail_count += 1;
-        self.state.last_retry = Some(Utc::now());
-        let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
-        info!(
-          "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
-          self.instance.domain, activity.id, self.state.fail_count
+    let initial_fail_count = self.state.fail_count;
+    let data = self.federation_lib_config.to_request_data();
+    let stop = self.stop.clone();
+    let domain = self.instance.domain.clone();
+    let mut report = self.report_send_result.clone();
+    tokio::spawn(async move {
+      let res = SendRetryTask {
+        activity: &ele.0,
+        object: &ele.1,
+        inbox_urls,
+        report: &mut report,
+        initial_fail_count,
+        domain,
+        context: data,
+        stop,
+      }
+      .send_retry_loop()
+      .await;
+      if let Err(e) = res {
+        tracing::warn!(
+          "sending {} errored internally, skipping activity: {:?}",
+          ele.0.ap_id,
+          e
         );
-        self.save_and_send_state().await?;
-        tokio::select! {
-          () = sleep(retry_delay) => {},
-          () = self.stop.cancelled() => {
-            // save state to db and exit
-            return Ok(());
-          }
-        }
+        // An error in this location means there is some deeper internal issue with the activity,
+        // for example the actor can't be loaded or similar. These issues are probably not
+        // solveable by retrying and would cause the federation for this instance to permanently be
+        // stuck in a retry loop. So we log the error and skip the activity (by reporting success to
+        // the worker)
+        report
+          .send(SendActivityResult::Success(SendSuccessInfo {
+            activity_id,
+            published: None,
+            was_skipped: true,
+          }))
+          .ok();
       }
-
-      // Activity send successful, mark instance as alive if it hasn't been updated in a while.
-      let updated = self.instance.updated.unwrap_or(self.instance.published);
-      if updated.add(Days::new(1)) < Utc::now() {
-        self.instance.updated = Some(Utc::now());
-
-        let form = InstanceForm::builder()
-          .domain(self.instance.domain.clone())
-          .updated(Some(naive_now()))
-          .build();
-        Instance::update(&mut self.context.pool(), self.instance.id, form).await?;
-      }
-    }
+    });
     Ok(())
   }
 
-  /// get inbox urls of sending the given activity to the given instance
-  /// most often this will return 0 values (if instance doesn't care about the activity)
-  /// or 1 value (the shared inbox)
-  /// > 1 values only happens for non-lemmy software
-  async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result<HashSet<Url>> {
-    let mut inbox_urls: HashSet<Url> = HashSet::new();
-
-    if activity.send_all_instances {
-      if !self.site_loaded {
-        self.site = Site::read_from_instance_id(&mut self.context.pool(), self.instance.id).await?;
-        self.site_loaded = true;
-      }
-      if let Some(site) = &self.site {
-        // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these
-        // activities. So handling it like this is fine.
-        inbox_urls.insert(site.inbox_url.inner().clone());
-      }
-    }
-    if let Some(t) = &activity.send_community_followers_of {
-      if let Some(urls) = self.followed_communities.get(t) {
-        inbox_urls.extend(urls.iter().cloned());
-      }
-    }
-    inbox_urls.extend(
-      activity
-        .send_inboxes
-        .iter()
-        .filter_map(std::option::Option::as_ref)
-        .filter(|&u| (u.domain() == Some(&self.instance.domain)))
-        .map(|u| u.inner().clone()),
-    );
-    Ok(inbox_urls)
-  }
-
-  async fn update_communities(&mut self) -> Result<()> {
-    if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
-      // process removals every hour
-      (self.followed_communities, self.last_full_communities_fetch) = self
-        .get_communities(self.instance.id, Utc.timestamp_nanos(0))
-        .await?;
-      self.last_incremental_communities_fetch = self.last_full_communities_fetch;
-    }
-    if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY {
-      // process additions every minute
-      let (news, time) = self
-        .get_communities(self.instance.id, self.last_incremental_communities_fetch)
-        .await?;
-      self.followed_communities.extend(news);
-      self.last_incremental_communities_fetch = time;
-    }
-    Ok(())
-  }
-
-  /// get a list of local communities with the remote inboxes on the given instance that cares about
-  /// them
-  async fn get_communities(
-    &mut self,
-    instance_id: InstanceId,
-    last_fetch: DateTime<Utc>,
-  ) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
-    let new_last_fetch =
-      Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if
-                                                                                         // published date is not exact
-    Ok((
-      CommunityFollowerView::get_instance_followed_community_inboxes(
-        &mut self.context.pool(),
-        instance_id,
-        last_fetch,
-      )
-      .await?
-      .into_iter()
-      .fold(HashMap::new(), |mut map, (c, u)| {
-        map.entry(c).or_default().insert(u.into());
-        map
-      }),
-      new_last_fetch,
-    ))
-  }
   async fn save_and_send_state(&mut self) -> Result<()> {
+    tracing::debug!("{}: saving and sending state", self.instance.domain);
     self.last_state_insert = Utc::now();
-    FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?;
+    FederationQueueState::upsert(&mut self.pool(), &self.state).await?;
     self.stats_sender.send(FederationQueueStateWithDomain {
       state: self.state.clone(),
       domain: self.instance.domain.clone(),
     })?;
     Ok(())
   }
+
+  fn pool(&self) -> DbPool<'_> {
+    DbPool::Pool(&self.pool)
+  }
+}
+
+#[cfg(test)]
+#[allow(clippy::unwrap_used)]
+#[allow(clippy::indexing_slicing)]
+mod test {
+
+  use super::*;
+  use activitypub_federation::{
+    http_signatures::generate_actor_keypair,
+    protocol::context::WithContext,
+  };
+  use actix_web::{dev::ServerHandle, web, App, HttpResponse, HttpServer};
+  use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url};
+  use lemmy_db_schema::{
+    newtypes::DbUrl,
+    source::{
+      activity::{ActorType, SentActivity, SentActivityForm},
+      person::{Person, PersonInsertForm},
+    },
+    traits::Crud,
+  };
+  use lemmy_utils::error::LemmyResult;
+  use reqwest::StatusCode;
+  use serde_json::{json, Value};
+  use serial_test::serial;
+  use test_context::{test_context, AsyncTestContext};
+  use tokio::{
+    spawn,
+    sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
+  };
+  use tracing_test::traced_test;
+  use url::Url;
+
+  struct Data {
+    context: activitypub_federation::config::Data<LemmyContext>,
+    instance: Instance,
+    person: Person,
+    stats_receiver: UnboundedReceiver<FederationQueueStateWithDomain>,
+    inbox_receiver: UnboundedReceiver<String>,
+    cancel: CancellationToken,
+    cleaned_up: bool,
+    wait_stop_server: ServerHandle,
+    is_concurrent: bool,
+  }
+
+  impl Data {
+    async fn init() -> LemmyResult<Self> {
+      let context = LemmyContext::init_test_federation_config().await;
+      let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?;
+
+      let actor_keypair = generate_actor_keypair()?;
+      let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into();
+      let person_form = PersonInsertForm {
+        actor_id: Some(actor_id.clone()),
+        private_key: (Some(actor_keypair.private_key)),
+        inbox_url: Some(generate_inbox_url(&actor_id)?),
+        shared_inbox_url: Some(generate_shared_inbox_url(context.settings())?),
+        ..PersonInsertForm::new("alice".to_string(), actor_keypair.public_key, instance.id)
+      };
+      let person = Person::create(&mut context.pool(), &person_form).await?;
+
+      let cancel = CancellationToken::new();
+      let (stats_sender, stats_receiver) = unbounded_channel();
+      let (inbox_sender, inbox_receiver) = unbounded_channel();
+
+      // listen for received activities in background
+      let wait_stop_server = listen_activities(inbox_sender)?;
+
+      let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS")
+        .ok()
+        .and_then(|s| s.parse().ok())
+        .unwrap_or(10);
+
+      let fed_config = FederationWorkerConfig {
+        concurrent_sends_per_instance,
+      };
+      spawn(InstanceWorker::init_and_loop(
+        instance.clone(),
+        context.clone(),
+        fed_config,
+        cancel.clone(),
+        stats_sender,
+      ));
+      // wait for startup
+      sleep(*WORK_FINISHED_RECHECK_DELAY).await;
+
+      Ok(Self {
+        context: context.to_request_data(),
+        instance,
+        person,
+        stats_receiver,
+        inbox_receiver,
+        cancel,
+        wait_stop_server,
+        cleaned_up: false,
+        is_concurrent: concurrent_sends_per_instance > 1,
+      })
+    }
+
+    async fn cleanup(&mut self) -> LemmyResult<()> {
+      if self.cleaned_up {
+        return Ok(());
+      }
+      self.cleaned_up = true;
+      self.cancel.cancel();
+      sleep(*WORK_FINISHED_RECHECK_DELAY).await;
+      Instance::delete_all(&mut self.context.pool()).await?;
+      Person::delete(&mut self.context.pool(), self.person.id).await?;
+      self.wait_stop_server.stop(true).await;
+      Ok(())
+    }
+  }
+
+  /// In order to guarantee that the webserver is stopped via the cleanup function,
+  /// we implement a test context.
+  impl AsyncTestContext for Data {
+    async fn setup() -> Data {
+      Data::init().await.unwrap()
+    }
+    async fn teardown(mut self) {
+      self.cleanup().await.unwrap()
+    }
+  }
+
+  #[test_context(Data)]
+  #[tokio::test]
+  #[traced_test]
+  #[serial]
+  async fn test_stats(data: &mut Data) -> LemmyResult<()> {
+    tracing::debug!("hello world");
+
+    // first receive at startup
+    let rcv = data.stats_receiver.recv().await.unwrap();
+    tracing::debug!("received first stats");
+    assert_eq!(data.instance.id, rcv.state.instance_id);
+
+    let sent = send_activity(data.person.actor_id.clone(), &data.context, true).await?;
+    tracing::debug!("sent activity");
+    // receive for successfully sent activity
+    let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
+    let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
+    assert_eq!(&sent.data, parsed_activity.inner());
+    tracing::debug!("received activity");
+
+    let rcv = data.stats_receiver.recv().await.unwrap();
+    assert_eq!(data.instance.id, rcv.state.instance_id);
+    assert_eq!(Some(sent.id), rcv.state.last_successful_id);
+    tracing::debug!("received second stats");
+
+    data.cleanup().await?;
+
+    // it also sends state on shutdown
+    let rcv = data.stats_receiver.try_recv();
+    assert!(rcv.is_ok());
+
+    // nothing further received
+    let rcv = data.stats_receiver.try_recv();
+    assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
+    let inbox_rcv = data.inbox_receiver.try_recv();
+    assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err());
+
+    Ok(())
+  }
+
+  #[test_context(Data)]
+  #[tokio::test]
+  #[traced_test]
+  #[serial]
+  async fn test_send_40(data: &mut Data) -> LemmyResult<()> {
+    tracing::debug!("hello world");
+
+    // first receive at startup
+    let rcv = data.stats_receiver.recv().await.unwrap();
+    tracing::debug!("received first stats");
+    assert_eq!(data.instance.id, rcv.state.instance_id);
+    // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
+    // let last_id_before = rcv.state.last_successful_id.unwrap();
+    let mut sent = Vec::new();
+    for _ in 0..40 {
+      sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?);
+    }
+    sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
+    tracing::debug!("sent activity");
+    compare_sent_with_receive(data, sent).await?;
+
+    Ok(())
+  }
+
+  #[test_context(Data)]
+  #[tokio::test]
+  #[traced_test]
+  #[serial]
+  /// this test sends 15 activities, waits and checks they have all been received, then sends 50,
+  /// etc
+  async fn test_send_15_20_30(data: &mut Data) -> LemmyResult<()> {
+    tracing::debug!("hello world");
+
+    // first receive at startup
+    let rcv = data.stats_receiver.recv().await.unwrap();
+    tracing::debug!("received first stats");
+    assert_eq!(data.instance.id, rcv.state.instance_id);
+    // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
+    // let last_id_before = rcv.state.last_successful_id.unwrap();
+    let counts = vec![15, 20, 35];
+    for count in counts {
+      tracing::debug!("sending {} activities", count);
+      let mut sent = Vec::new();
+      for _ in 0..count {
+        sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?);
+      }
+      sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
+      tracing::debug!("sent activity");
+      compare_sent_with_receive(data, sent).await?;
+    }
+
+    Ok(())
+  }
+
+  #[test_context(Data)]
+  #[tokio::test]
+  #[serial]
+  async fn test_update_instance(data: &mut Data) -> LemmyResult<()> {
+    let form = InstanceForm::builder()
+      .domain(data.instance.domain.clone())
+      .updated(None)
+      .build();
+    Instance::update(&mut data.context.pool(), data.instance.id, form).await?;
+
+    send_activity(data.person.actor_id.clone(), &data.context, true).await?;
+    data.inbox_receiver.recv().await.unwrap();
+
+    let instance =
+      Instance::read_or_create(&mut data.context.pool(), data.instance.domain.clone()).await?;
+
+    assert!(instance.updated.is_some());
+
+    data.cleanup().await?;
+
+    Ok(())
+  }
+
+  fn listen_activities(inbox_sender: UnboundedSender<String>) -> LemmyResult<ServerHandle> {
+    let run = HttpServer::new(move || {
+      App::new()
+        .app_data(actix_web::web::Data::new(inbox_sender.clone()))
+        .route(
+          "/inbox",
+          web::post().to(
+            |inbox_sender: actix_web::web::Data<UnboundedSender<String>>, body: String| async move {
+              tracing::debug!("received activity: {:?}", body);
+              inbox_sender.send(body.clone()).unwrap();
+              HttpResponse::new(StatusCode::OK)
+            },
+          ),
+        )
+    })
+    .bind(("127.0.0.1", 8085))?
+    .run();
+    let handle = run.handle();
+    tokio::spawn(async move {
+      run.await.unwrap();
+      /*select! {
+        _ = run => {},
+        _ = cancel.cancelled() => { }
+      }*/
+    });
+    Ok(handle)
+  }
+
+  async fn send_activity(
+    actor_id: DbUrl,
+    context: &LemmyContext,
+    wait: bool,
+  ) -> LemmyResult<SentActivity> {
+    // create outgoing activity
+    let data = json!({
+      "actor": "http://ds9.lemmy.ml/u/lemmy_alpha",
+      "object": "http://ds9.lemmy.ml/comment/1",
+      "audience": "https://enterprise.lemmy.ml/c/tenforward",
+      "type": "Like",
+      "id": format!("http://ds9.lemmy.ml/activities/like/{}", uuid::Uuid::new_v4()),
+    });
+    let form = SentActivityForm {
+      ap_id: Url::parse(&format!(
+        "http://local.com/activity/{}",
+        uuid::Uuid::new_v4()
+      ))?
+      .into(),
+      data,
+      sensitive: false,
+      send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],
+      send_all_instances: false,
+      send_community_followers_of: None,
+      actor_type: ActorType::Person,
+      actor_apub_id: actor_id,
+    };
+    let sent = SentActivity::create(&mut context.pool(), form).await?;
+
+    if wait {
+      sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await;
+    }
+
+    Ok(sent)
+  }
+  async fn compare_sent_with_receive(data: &mut Data, mut sent: Vec<SentActivity>) -> Result<()> {
+    let check_order = !data.is_concurrent; // allow out-of order receiving when running parallel
+    let mut received = Vec::new();
+    for _ in 0..sent.len() {
+      let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
+      let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
+      received.push(parsed_activity);
+    }
+    if !check_order {
+      // sort by id
+      received.sort_by(|a, b| {
+        a.inner()["id"]
+          .as_str()
+          .unwrap()
+          .cmp(b.inner()["id"].as_str().unwrap())
+      });
+      sent.sort_by(|a, b| {
+        a.data["id"]
+          .as_str()
+          .unwrap()
+          .cmp(b.data["id"].as_str().unwrap())
+      });
+    }
+    // receive for successfully sent activity
+    for i in 0..sent.len() {
+      let sent_activity = &sent[i];
+      let received_activity = received[i].inner();
+      assert_eq!(&sent_activity.data, received_activity);
+      tracing::debug!("received activity");
+    }
+    Ok(())
+  }
 }
diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs
index 75aa56a887..dcacc37b64 100644
--- a/crates/utils/src/settings/structs.rs
+++ b/crates/utils/src/settings/structs.rs
@@ -43,12 +43,8 @@ pub struct Settings {
   #[default(None)]
   #[doku(skip)]
   pub opentelemetry_url: Option<Url>,
-  /// The number of activitypub federation workers that can be in-flight concurrently
-  #[default(0)]
-  pub worker_count: usize,
-  /// The number of activitypub federation retry workers that can be in-flight concurrently
-  #[default(0)]
-  pub retry_count: usize,
+  #[default(Default::default())]
+  pub federation: FederationWorkerConfig,
   // Prometheus configuration.
   #[default(None)]
   #[doku(example = "Some(Default::default())")]
@@ -237,3 +233,14 @@ pub struct PrometheusConfig {
   #[doku(example = "10002")]
   pub port: i32,
 }
+
+#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
+#[serde(default)]
+// named federation"worker"config to disambiguate from the activitypub library configuration
+pub struct FederationWorkerConfig {
+  /// Limit to the number of concurrent outgoing federation requests per target instance.
+  /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
+  /// per second) and if a receiving instance is not keeping up.
+  #[default(1)]
+  pub concurrent_sends_per_instance: i64,
+}
diff --git a/scripts/test.sh b/scripts/test.sh
index 9bb6acaa84..04cc94f9d8 100755
--- a/scripts/test.sh
+++ b/scripts/test.sh
@@ -14,6 +14,7 @@ source scripts/start_dev_db.sh
 # so to load the config we need to traverse to the repo root
 export LEMMY_CONFIG_LOCATION=../../config/config.hjson
 export RUST_BACKTRACE=1
+export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
 
 if [ -n "$PACKAGE" ];
 then
diff --git a/src/lib.rs b/src/lib.rs
index 74c05deaab..3c3633b753 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -233,6 +233,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
         process_count: args.federate_process_count,
       },
       cfg,
+      SETTINGS.federation.clone(),
     )
   });
   let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;