From 987174a6c1c6e9b7f407737ded4d00ab96367660 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 17:59:26 +0200 Subject: [PATCH] lint and set force_write true when a request fails --- crates/federate/src/lib.rs | 10 +--------- crates/federate/src/worker.rs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a48f31ed9..6bcd453f1 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -79,15 +79,7 @@ async fn start_stop_federation_workers( let instance = instance.clone(); let config = config.clone(); let stats_sender = stats_sender.clone(); - async move { - InstanceWorker::init_and_loop( - instance, - config, - stop, - stats_sender, - ) - .await - } + async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await } }), ); } else if !should_federate { diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 58896750a..950458ad9 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -231,7 +231,7 @@ impl InstanceWorker { /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen async fn get_latest_id(&mut self) -> Result { let latest_id = get_latest_activity_id(&mut self.pool()).await?; - if let None = self.state.last_successful_id { + if self.state.last_successful_id.is_none() { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); @@ -247,7 +247,7 @@ impl InstanceWorker { successfuls: &mut BinaryHeap, in_flight: &mut i64, ) -> Result<(), anyhow::Error> { - let force_write = false; + let mut force_write = false; let mut events = Vec::new(); // wait for at least one event but if there's multiple handle them all receive_inbox_result.recv_many(&mut events, 1000).await; @@ -266,6 +266,7 @@ impl InstanceWorker { // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine self.state.fail_count = fail_count; self.state.last_retry = Some(Utc::now()); + force_write = true; } } } @@ -378,11 +379,13 @@ impl InstanceWorker { ele.0.ap_id, e ); - report.send(SendActivityResult::Success(SendSuccessInfo { - activity_id, - published: None, - was_skipped: true, - })).ok(); + report + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })) + .ok(); } }); Ok(())