move federation concurrent config to config file

This commit is contained in:
phiresky 2024-04-15 19:41:18 +02:00
parent e719bafc9b
commit 5e986ef5dd
6 changed files with 62 additions and 55 deletions

View file

@ -106,10 +106,11 @@
port: 8536 port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work. # Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently federation: {
worker_count: 0 # Limit to the number of concurrent outgoing federation requests per target instance.
# The number of activitypub federation retry workers that can be in-flight concurrently # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up.
retry_count: 0 concurrent_sends_per_instance: 1
}
prometheus: { prometheus: {
bind: "127.0.0.1" bind: "127.0.0.1"
port: 10002 port: 10002

View file

@ -1,7 +1,11 @@
use crate::{util::CancellableTask, worker::InstanceWorker}; use crate::{util::CancellableTask, worker::InstanceWorker};
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike}; use chrono::{Local, Timelike};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_api_common::{
context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::InstanceId, newtypes::InstanceId,
source::{federation_queue_state::FederationQueueState, instance::Instance}, source::{federation_queue_state::FederationQueueState, instance::Instance},
@ -36,7 +40,8 @@ pub struct Opts {
async fn start_stop_federation_workers( async fn start_stop_federation_workers(
opts: Opts, opts: Opts,
pool: ActualDbPool, pool: ActualDbPool,
federation_config: FederationConfig<LemmyContext>, federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
cancel: CancellationToken, cancel: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut workers = HashMap::<InstanceId, CancellableTask>::new(); let mut workers = HashMap::<InstanceId, CancellableTask>::new();
@ -45,7 +50,9 @@ async fn start_stop_federation_workers(
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
let pool2 = &mut DbPool::Pool(&pool); let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1; let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?; let local_domain = federation_lib_config
.settings()
.get_hostname_without_port()?;
loop { loop {
let mut total_count = 0; let mut total_count = 0;
let mut dead_count = 0; let mut dead_count = 0;
@ -73,15 +80,19 @@ async fn start_stop_federation_workers(
continue; continue;
} }
// create new worker // create new worker
let config = federation_config.clone(); let config = federation_lib_config.clone();
let stats_sender = stats_sender.clone(); let stats_sender = stats_sender.clone();
let federation_worker_config = federation_worker_config.clone();
workers.insert( workers.insert(
instance.id, instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let instance = instance.clone(); InstanceWorker::init_and_loop(
let config = config.clone(); instance.clone(),
let stats_sender = stats_sender.clone(); config.clone(),
async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await } federation_worker_config.clone(),
stop,
stats_sender.clone(),
)
}), }),
); );
} else if !should_federate { } else if !should_federate {
@ -117,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable(
opts: Opts, opts: Opts,
pool: ActualDbPool, pool: ActualDbPool,
config: FederationConfig<LemmyContext>, config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
) -> CancellableTask { ) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let opts = opts.clone(); start_stop_federation_workers(
let pool = pool.clone(); opts.clone(),
let config = config.clone(); pool.clone(),
async move { start_stop_federation_workers(opts, pool, config, stop).await } config.clone(),
federation_worker_config.clone(),
stop,
)
}) })
} }

View file

@ -1,4 +1,4 @@
use crate::util::{get_activity_cached, get_actor_cached}; use crate::util::get_actor_cached;
use activitypub_federation::{ use activitypub_federation::{
activity_sending::SendActivityTask, activity_sending::SendActivityTask,
config::Data, config::Data,
@ -10,7 +10,7 @@ use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity};
use reqwest::Url; use reqwest::Url;
use std::{ops::Deref, sync::Arc, time::Duration}; use std::ops::Deref;
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;

View file

@ -1,38 +1,25 @@
use crate::{ use crate::{
inboxes::CommunityInboxCollector, inboxes::CommunityInboxCollector,
send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, send::{SendActivityResult, SendRetryTask, SendSuccessInfo},
util::{ util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY},
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
WORK_FINISHED_RECHECK_DELAY,
},
};
use activitypub_federation::{
activity_sending::SendActivityTask,
config::{Data, FederationConfig},
protocol::context::WithContext,
}; };
use activitypub_federation::config::FederationConfig;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::{DateTime, Days, TimeZone, Utc}; use chrono::{DateTime, Days, TimeZone, Utc};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_api_common::{
use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::ActivityId, newtypes::ActivityId,
source::{ source::{
activity::SentActivity,
federation_queue_state::FederationQueueState, federation_queue_state::FederationQueueState,
instance::{Instance, InstanceForm}, instance::{Instance, InstanceForm},
}, },
utils::{naive_now, ActualDbPool, DbPool}, utils::{naive_now, ActualDbPool, DbPool},
}; };
use once_cell::sync::Lazy; use std::{collections::BinaryHeap, ops::Add, time::Duration};
use reqwest::Url;
use std::{
collections::BinaryHeap,
ops::{Add, Deref},
time::Duration,
};
use tokio::{ use tokio::{
sync::mpsc::{self, UnboundedSender}, sync::mpsc::{self, UnboundedSender},
time::sleep, time::sleep,
@ -42,19 +29,14 @@ use tokio_util::sync::CancellationToken;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
static CONCURRENT_SENDS: Lazy<i64> = Lazy::new(|| {
std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8)
});
/// Maximum number of successful sends to allow out of order /// Maximum number of successful sends to allow out of order
const MAX_SUCCESSFULS: usize = 1000; const MAX_SUCCESSFULS: usize = 1000;
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
instance: Instance, instance: Instance,
stop: CancellationToken, stop: CancellationToken,
config: FederationConfig<LemmyContext>, federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
stats_sender: UnboundedSender<(String, FederationQueueState)>, stats_sender: UnboundedSender<(String, FederationQueueState)>,
state: FederationQueueState, state: FederationQueueState,
last_state_insert: DateTime<Utc>, last_state_insert: DateTime<Utc>,
@ -66,6 +48,7 @@ impl InstanceWorker {
pub(crate) async fn init_and_loop( pub(crate) async fn init_and_loop(
instance: Instance, instance: Instance,
config: FederationConfig<LemmyContext>, config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<(String, FederationQueueState)>, stats_sender: UnboundedSender<(String, FederationQueueState)>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
@ -77,9 +60,10 @@ impl InstanceWorker {
instance.id, instance.id,
instance.domain.clone(), instance.domain.clone(),
), ),
federation_worker_config,
instance, instance,
stop, stop,
config, federation_lib_config: config,
stats_sender, stats_sender,
state, state,
last_state_insert: Utc.timestamp_nanos(0), last_state_insert: Utc.timestamp_nanos(0),
@ -108,7 +92,7 @@ impl InstanceWorker {
// or (b) if we have too many successfuls in memory or (c) if we have too many in flight // or (b) if we have too many successfuls in memory or (c) if we have too many in flight
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() >= MAX_SUCCESSFULS || successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= *CONCURRENT_SENDS; || in_flight >= self.federation_worker_config.concurrent_sends_per_instance;
if need_wait_for_event || receive_send_result.len() > 4 { if need_wait_for_event || receive_send_result.len() > 4 {
// if len() > 0 then this does not block and allows us to write to db more often // if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions, // if len is 0 then this means we wait for something to change our above conditions,
@ -312,7 +296,7 @@ impl InstanceWorker {
return Ok(()); return Ok(());
} }
let initial_fail_count = self.state.fail_count; let initial_fail_count = self.state.fail_count;
let data = self.config.to_request_data(); let data = self.federation_lib_config.to_request_data();
let stop = self.stop.clone(); let stop = self.stop.clone();
let domain = self.instance.domain.clone(); let domain = self.instance.domain.clone();
tokio::spawn(async move { tokio::spawn(async move {

View file

@ -42,12 +42,8 @@ pub struct Settings {
#[default(None)] #[default(None)]
#[doku(skip)] #[doku(skip)]
pub opentelemetry_url: Option<Url>, pub opentelemetry_url: Option<Url>,
/// The number of activitypub federation workers that can be in-flight concurrently #[default(Default::default())]
#[default(0)] pub federation: FederationWorkerConfig,
pub worker_count: usize,
/// The number of activitypub federation retry workers that can be in-flight concurrently
#[default(0)]
pub retry_count: usize,
// Prometheus configuration. // Prometheus configuration.
#[default(None)] #[default(None)]
#[doku(example = "Some(Default::default())")] #[doku(example = "Some(Default::default())")]
@ -234,3 +230,13 @@ pub struct PrometheusConfig {
#[doku(example = "10002")] #[doku(example = "10002")]
pub port: i32, pub port: i32,
} }
#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
#[serde(default)]
// named federation"worker"config to disambiguate from the activitypub library configuration
pub struct FederationWorkerConfig {
/// Limit to the number of concurrent outgoing federation requests per target instance.
/// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up.
#[default(1)]
pub concurrent_sends_per_instance: i64,
}

View file

@ -213,6 +213,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
}, },
pool.clone(), pool.clone(),
federation_config.clone(), federation_config.clone(),
SETTINGS.federation.clone(),
) )
}); });
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;