From 491daabaf2c1eeeb59857433a41a1c484b384148 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 13 Apr 2024 23:48:32 +0200 Subject: [PATCH] federation: some comments --- crates/federate/src/worker.rs | 67 ++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 0dbf1c391..58896750a 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -93,15 +93,14 @@ struct SendSuccessInfo { was_skipped: bool, } impl PartialOrd for SendSuccessInfo { - fn partial_cmp(&self, other: &Self) -> Option { - other.activity_id.partial_cmp(&self.activity_id) - } + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } } impl Ord for SendSuccessInfo { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other.activity_id.cmp(&self.activity_id) - } - + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } } enum SendActivityResult { Success(SendSuccessInfo), @@ -115,13 +114,11 @@ impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, config: FederationConfig, - // pool: ActualDbPool, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = - FederationQueueState::load(&mut config.to_request_data().pool(), instance.id).await?; let pool = config.to_request_data().inner_pool().clone(); + let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let mut worker = InstanceWorker { instance, site_loaded: false, @@ -147,28 +144,43 @@ impl InstanceWorker { // activities that have been successfully sent but // that are not the lowest number and thus can't be written to the database yet let mut successfuls = BinaryHeap::::new(); + // number of activities that currently have a task spawned to send it let mut in_flight: i64 = 0; - let (report_inbox_result, mut receive_inbox_result) = + // each HTTP send will report back to this channel concurrently + let (report_send_result, mut receive_send_result) = tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { // check if we need to wait for a send to finish before sending the next one + // we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop) + // or (b) if we have too many successfuls in memory or (c) if we have too many in flight let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) - || successfuls.len() > MAX_SUCCESSFULS + || successfuls.len() >= MAX_SUCCESSFULS || in_flight >= *CONCURRENT_SENDS; - if need_wait_for_event || receive_inbox_result.len() > 4 { + if need_wait_for_event || receive_send_result.len() > 4 { + // if len() > 0 then this does not block and allows us to write to db more often + // if len is 0 then this means we wait for something to change our above conditions, + // which can only happen by an event sent into the channel self - .handle_send_results(&mut receive_inbox_result, &mut successfuls, &mut in_flight) + .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) .await?; + // handle_send_results does not guarantee that we are now in a condition where we want to send a new one, + // so repeat this check until the if no longer applies + continue; } else { + // send a new activity if there is one self.update_communities().await?; - let last_successful_id = self - .state - .last_successful_id - .map(|e| e.0) - .expect("set above"); - let next_id = ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1); + let next_id = { + // calculate next id to send based on the last id and the in flight requests + let last_successful_id = self + .state + .last_successful_id + .map(|e| e.0) + .expect("set above"); + ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1) + }; if next_id > latest_id { + // lazily fetch latest id only if we have cought up latest_id = self.get_latest_id().await?; if next_id > latest_id { // no more work to be done, wait before rechecking @@ -181,7 +193,7 @@ impl InstanceWorker { } in_flight += 1; self - .spawn_send_if_needed(next_id, report_inbox_result.clone()) + .spawn_send_if_needed(next_id, report_send_result.clone()) .await?; } } @@ -348,11 +360,12 @@ impl InstanceWorker { let stop = self.stop.clone(); let domain = self.instance.domain.clone(); tokio::spawn(async move { + let mut report = report; if let Err(e) = InstanceWorker::send_retry_loop( &ele.0, &ele.1, inbox_urls, - report, + &mut report, initial_fail_count, domain, data, @@ -365,6 +378,11 @@ impl InstanceWorker { ele.0.ap_id, e ); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })).ok(); } }); Ok(()) @@ -376,7 +394,7 @@ impl InstanceWorker { activity: &SentActivity, object: &SharedInboxActivities, inbox_urls: Vec, - report: UnboundedSender, + report: &mut UnboundedSender, initial_fail_count: i32, domain: String, context: Data, @@ -384,7 +402,7 @@ impl InstanceWorker { ) -> Result<()> { let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { - return Ok(()); // activity was inserted before persistent queue was activated + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); }; let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) .await @@ -413,6 +431,7 @@ impl InstanceWorker { () = sleep(retry_delay) => {}, () = stop.cancelled() => { // save state to db and exit + // TODO: do we need to report state here to prevent hang on exit? return Ok(()); } }