move sender for clarity, add comment

This commit is contained in:
phiresky 2024-07-15 15:57:13 +02:00
parent dda528bd75
commit 694c293f1c

View file

@ -47,16 +47,29 @@ static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 4;
#[cfg(test)] #[cfg(test)]
static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0; static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0;
///
/// SendManager --(has many)--> InstanceWorker --(has many)--> SendRetryTask
/// | | |
/// -----|------create worker -> loop activities--create task-> send activity
/// | | vvvv
/// | | fail or success
/// | | <-report result-- |
/// | <---order and aggrate results--- |
/// | <---send stats--- | |
/// filter and print stats | |
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
instance: Instance, instance: Instance,
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
federation_lib_config: FederationConfig<LemmyContext>, federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig, federation_worker_config: FederationWorkerConfig,
state: FederationQueueState, state: FederationQueueState,
last_state_insert: DateTime<Utc>, last_state_insert: DateTime<Utc>,
pool: ActualDbPool, pool: ActualDbPool,
inbox_collector: CommunityInboxCollector, inbox_collector: CommunityInboxCollector,
// regularily send stats back to the SendManager
stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
// each HTTP send will report back to this channel concurrently
receive_send_result: mpsc::UnboundedReceiver<SendActivityResult>,
} }
impl InstanceWorker { impl InstanceWorker {
@ -69,6 +82,8 @@ impl InstanceWorker {
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let pool = config.to_request_data().inner_pool().clone(); let pool = config.to_request_data().inner_pool().clone();
let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?;
let (report_send_result, receive_send_result) =
tokio::sync::mpsc::unbounded_channel::<SendActivityResult>();
let mut worker = InstanceWorker { let mut worker = InstanceWorker {
inbox_collector: CommunityInboxCollector::new( inbox_collector: CommunityInboxCollector::new(
pool.clone(), pool.clone(),
@ -83,13 +98,18 @@ impl InstanceWorker {
state, state,
last_state_insert: Utc.timestamp_nanos(0), last_state_insert: Utc.timestamp_nanos(0),
pool, pool,
receive_send_result,
}; };
worker.loop_until_stopped().await
worker.loop_until_stopped(report_send_result).await
} }
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit) /// cancelled (graceful exit)
async fn loop_until_stopped(&mut self) -> Result<()> { async fn loop_until_stopped(
&mut self,
report_send_result: UnboundedSender<SendActivityResult>,
) -> Result<()> {
self.initial_fail_sleep().await?; self.initial_fail_sleep().await?;
let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?;
@ -99,9 +119,6 @@ impl InstanceWorker {
// number of activities that currently have a task spawned to send it // number of activities that currently have a task spawned to send it
let mut in_flight: i64 = 0; let mut in_flight: i64 = 0;
// each HTTP send will report back to this channel concurrently
let (report_send_result, mut receive_send_result) =
tokio::sync::mpsc::unbounded_channel::<SendActivityResult>();
while !self.stop.is_cancelled() { while !self.stop.is_cancelled() {
// check if we need to wait for a send to finish before sending the next one // check if we need to wait for a send to finish before sending the next one
// we wait if (a) the last request failed, only if a request is already in flight (not at the // we wait if (a) the last request failed, only if a request is already in flight (not at the
@ -110,12 +127,13 @@ impl InstanceWorker {
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() >= MAX_SUCCESSFULS || successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= self.federation_worker_config.concurrent_sends_per_instance; || in_flight >= self.federation_worker_config.concurrent_sends_per_instance;
if need_wait_for_event || receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE { if need_wait_for_event || self.receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE
{
// if len() > 0 then this does not block and allows us to write to db more often // if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions, // if len is 0 then this means we wait for something to change our above conditions,
// which can only happen by an event sent into the channel // which can only happen by an event sent into the channel
self self
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) .handle_send_results(&mut successfuls, &mut in_flight)
.await?; .await?;
// handle_send_results does not guarantee that we are now in a condition where we want to // handle_send_results does not guarantee that we are now in a condition where we want to
// send a new one, so repeat this check until the if no longer applies // send a new one, so repeat this check until the if no longer applies
@ -223,14 +241,13 @@ impl InstanceWorker {
async fn handle_send_results( async fn handle_send_results(
&mut self, &mut self,
receive_inbox_result: &mut mpsc::UnboundedReceiver<SendActivityResult>,
successfuls: &mut BinaryHeap<SendSuccessInfo>, successfuls: &mut BinaryHeap<SendSuccessInfo>,
in_flight: &mut i64, in_flight: &mut i64,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut force_write = false; let mut force_write = false;
let mut events = Vec::new(); let mut events = Vec::new();
// wait for at least one event but if there's multiple handle them all // wait for at least one event but if there's multiple handle them all
receive_inbox_result.recv_many(&mut events, 1000).await; self.receive_send_result.recv_many(&mut events, 1000).await;
for event in events { for event in events {
match event { match event {
SendActivityResult::Success(s) => { SendActivityResult::Success(s) => {