diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index e2118eea7..0a6a478c9 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -47,16 +47,29 @@ static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 4; #[cfg(test)] static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0; +/// +/// SendManager --(has many)--> InstanceWorker --(has many)--> SendRetryTask +/// | | | +/// -----|------create worker -> loop activities--create task-> send activity +/// | | vvvv +/// | | fail or success +/// | | <-report result-- | +/// | <---order and aggrate results--- | +/// | <---send stats--- | | +/// filter and print stats | | pub(crate) struct InstanceWorker { instance: Instance, stop: CancellationToken, - stats_sender: UnboundedSender, federation_lib_config: FederationConfig, federation_worker_config: FederationWorkerConfig, state: FederationQueueState, last_state_insert: DateTime, pool: ActualDbPool, inbox_collector: CommunityInboxCollector, + // regularily send stats back to the SendManager + stats_sender: UnboundedSender, + // each HTTP send will report back to this channel concurrently + receive_send_result: mpsc::UnboundedReceiver, } impl InstanceWorker { @@ -69,6 +82,8 @@ impl InstanceWorker { ) -> Result<(), anyhow::Error> { let pool = config.to_request_data().inner_pool().clone(); let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; + let (report_send_result, receive_send_result) = + tokio::sync::mpsc::unbounded_channel::(); let mut worker = InstanceWorker { inbox_collector: CommunityInboxCollector::new( pool.clone(), @@ -83,13 +98,18 @@ impl InstanceWorker { state, last_state_insert: Utc.timestamp_nanos(0), pool, + receive_send_result, }; - worker.loop_until_stopped().await + + worker.loop_until_stopped(report_send_result).await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// cancelled (graceful exit) - async fn loop_until_stopped(&mut self) -> Result<()> { + async fn loop_until_stopped( + &mut self, + report_send_result: UnboundedSender, + ) -> Result<()> { self.initial_fail_sleep().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; @@ -99,9 +119,6 @@ impl InstanceWorker { // number of activities that currently have a task spawned to send it let mut in_flight: i64 = 0; - // each HTTP send will report back to this channel concurrently - let (report_send_result, mut receive_send_result) = - tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { // check if we need to wait for a send to finish before sending the next one // we wait if (a) the last request failed, only if a request is already in flight (not at the @@ -110,12 +127,13 @@ impl InstanceWorker { let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) || successfuls.len() >= MAX_SUCCESSFULS || in_flight >= self.federation_worker_config.concurrent_sends_per_instance; - if need_wait_for_event || receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE { + if need_wait_for_event || self.receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE + { // if len() > 0 then this does not block and allows us to write to db more often // if len is 0 then this means we wait for something to change our above conditions, // which can only happen by an event sent into the channel self - .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) + .handle_send_results(&mut successfuls, &mut in_flight) .await?; // handle_send_results does not guarantee that we are now in a condition where we want to // send a new one, so repeat this check until the if no longer applies @@ -223,14 +241,13 @@ impl InstanceWorker { async fn handle_send_results( &mut self, - receive_inbox_result: &mut mpsc::UnboundedReceiver, successfuls: &mut BinaryHeap, in_flight: &mut i64, ) -> Result<(), anyhow::Error> { let mut force_write = false; let mut events = Vec::new(); // wait for at least one event but if there's multiple handle them all - receive_inbox_result.recv_many(&mut events, 1000).await; + self.receive_send_result.recv_many(&mut events, 1000).await; for event in events { match event { SendActivityResult::Success(s) => {