From 0adf49d9ab73b2ff7868d49752b1500211f90961 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 29 May 2024 23:17:49 +0200 Subject: [PATCH] rename instance to target --- crates/federate/src/inboxes.rs | 11 ++++++----- crates/federate/src/worker.rs | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs index 33fe9f601..6ae182917 100644 --- a/crates/federate/src/inboxes.rs +++ b/crates/federate/src/inboxes.rs @@ -34,7 +34,7 @@ static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); pub(crate) struct CommunityInboxCollector { - instance: Instance, + target: Instance, // load site lazily because if an instance is first seen due to being on allowlist, // the corresponding row in `site` may not exist yet since that is only added once // `fetch_instance_actor_for_object` is called. @@ -48,9 +48,9 @@ pub(crate) struct CommunityInboxCollector { } impl CommunityInboxCollector { - pub fn new(instance: Instance) -> Self { + pub fn new(target: Instance) -> Self { Self { - instance, + target, site_loaded: false, site: None, followed_communities: HashMap::new(), @@ -72,7 +72,7 @@ impl CommunityInboxCollector { if activity.send_all_instances { if !self.site_loaded { - self.site = Site::read_from_instance_id(&mut context.pool(), self.instance.id).await?; + self.site = Site::read_from_instance_id(&mut context.pool(), self.target.id).await?; self.site_loaded = true; } if let Some(site) = &self.site { @@ -91,6 +91,7 @@ impl CommunityInboxCollector { .send_inboxes .iter() .filter_map(std::option::Option::as_ref) + .filter(|&u| (u.domain() == Some(&self.target.domain))) .map(|u| u.inner().clone()), ); @@ -135,7 +136,7 @@ impl CommunityInboxCollector { ) -> LemmyResult>> { let followed = CommunityFollowerView::get_instance_followed_community_inboxes( &mut context.pool(), - self.instance.id, + self.target.id, last_fetch, ) .await?; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 41dadc765..64a5f3a7e 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -46,7 +46,7 @@ static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(1); static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(60); pub(crate) struct InstanceWorker { - instance: Instance, + target: Instance, inboxes: CommunityInboxCollector, stop: CancellationToken, context: Data, @@ -66,7 +66,7 @@ impl InstanceWorker { let state = FederationQueueState::load(&mut pool, instance.id).await?; let inboxes = CommunityInboxCollector::new(instance.clone()); let mut worker = InstanceWorker { - instance, + target: instance, inboxes, stop, context, @@ -80,7 +80,7 @@ impl InstanceWorker { /// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// cancelled (graceful exit) pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> { - debug!("Starting federation worker for {}", self.instance.domain); + debug!("Starting federation worker for {}", self.target.domain); self.inboxes.update_communities(&self.context).await?; self.initial_fail_sleep().await?; while !self.stop.is_cancelled() { @@ -152,7 +152,7 @@ impl InstanceWorker { .await .context("failed reading activity from db")? else { - debug!("{}: {:?} does not exist", self.instance.domain, id); + debug!("{}: {:?} does not exist", self.target.domain, id); self.state.last_successful_id = Some(id); continue; }; @@ -183,7 +183,7 @@ impl InstanceWorker { println!("send retry loop {:?}", activity.id); let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?; if inbox_urls.is_empty() { - trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); + trace!("{}: {:?} no inboxes", self.target.domain, activity.id); self.state.last_successful_id = Some(activity.id); self.state.last_successful_published_time = Some(activity.published); return Ok(()); @@ -209,7 +209,7 @@ impl InstanceWorker { let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, activity.id, self.state.fail_count + self.target.domain, activity.id, self.state.fail_count ); self.save_and_send_state().await?; tokio::select! { @@ -222,15 +222,15 @@ impl InstanceWorker { } // Activity send successful, mark instance as alive if it hasn't been updated in a while. - let updated = self.instance.updated.unwrap_or(self.instance.published); + let updated = self.target.updated.unwrap_or(self.target.published); if updated.add(Days::new(1)) < Utc::now() { - self.instance.updated = Some(Utc::now()); + self.target.updated = Some(Utc::now()); let form = InstanceForm::builder() - .domain(self.instance.domain.clone()) + .domain(self.target.domain.clone()) .updated(Some(naive_now())) .build(); - Instance::update(&mut self.context.pool(), self.instance.id, form).await?; + Instance::update(&mut self.context.pool(), self.target.id, form).await?; } } Ok(()) @@ -241,7 +241,7 @@ impl InstanceWorker { FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; self .stats_sender - .send((self.instance.id, self.state.clone()))?; + .send((self.target.id, self.state.clone()))?; Ok(()) } }