rename instance to target

This commit is contained in:
Felix Ableitner 2024-05-29 23:17:49 +02:00
parent 53f79a9174
commit 0adf49d9ab
2 changed files with 17 additions and 16 deletions

View file

@ -34,7 +34,7 @@ static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
pub(crate) struct CommunityInboxCollector { pub(crate) struct CommunityInboxCollector {
instance: Instance, target: Instance,
// load site lazily because if an instance is first seen due to being on allowlist, // load site lazily because if an instance is first seen due to being on allowlist,
// the corresponding row in `site` may not exist yet since that is only added once // the corresponding row in `site` may not exist yet since that is only added once
// `fetch_instance_actor_for_object` is called. // `fetch_instance_actor_for_object` is called.
@ -48,9 +48,9 @@ pub(crate) struct CommunityInboxCollector {
} }
impl CommunityInboxCollector { impl CommunityInboxCollector {
pub fn new(instance: Instance) -> Self { pub fn new(target: Instance) -> Self {
Self { Self {
instance, target,
site_loaded: false, site_loaded: false,
site: None, site: None,
followed_communities: HashMap::new(), followed_communities: HashMap::new(),
@ -72,7 +72,7 @@ impl CommunityInboxCollector {
if activity.send_all_instances { if activity.send_all_instances {
if !self.site_loaded { if !self.site_loaded {
self.site = Site::read_from_instance_id(&mut context.pool(), self.instance.id).await?; self.site = Site::read_from_instance_id(&mut context.pool(), self.target.id).await?;
self.site_loaded = true; self.site_loaded = true;
} }
if let Some(site) = &self.site { if let Some(site) = &self.site {
@ -91,6 +91,7 @@ impl CommunityInboxCollector {
.send_inboxes .send_inboxes
.iter() .iter()
.filter_map(std::option::Option::as_ref) .filter_map(std::option::Option::as_ref)
.filter(|&u| (u.domain() == Some(&self.target.domain)))
.map(|u| u.inner().clone()), .map(|u| u.inner().clone()),
); );
@ -135,7 +136,7 @@ impl CommunityInboxCollector {
) -> LemmyResult<HashMap<CommunityId, HashSet<Url>>> { ) -> LemmyResult<HashMap<CommunityId, HashSet<Url>>> {
let followed = CommunityFollowerView::get_instance_followed_community_inboxes( let followed = CommunityFollowerView::get_instance_followed_community_inboxes(
&mut context.pool(), &mut context.pool(),
self.instance.id, self.target.id,
last_fetch, last_fetch,
) )
.await?; .await?;

View file

@ -46,7 +46,7 @@ static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(1);
static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(60); static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(60);
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
instance: Instance, target: Instance,
inboxes: CommunityInboxCollector, inboxes: CommunityInboxCollector,
stop: CancellationToken, stop: CancellationToken,
context: Data<LemmyContext>, context: Data<LemmyContext>,
@ -66,7 +66,7 @@ impl InstanceWorker {
let state = FederationQueueState::load(&mut pool, instance.id).await?; let state = FederationQueueState::load(&mut pool, instance.id).await?;
let inboxes = CommunityInboxCollector::new(instance.clone()); let inboxes = CommunityInboxCollector::new(instance.clone());
let mut worker = InstanceWorker { let mut worker = InstanceWorker {
instance, target: instance,
inboxes, inboxes,
stop, stop,
context, context,
@ -80,7 +80,7 @@ impl InstanceWorker {
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit) /// cancelled (graceful exit)
pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> { pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> {
debug!("Starting federation worker for {}", self.instance.domain); debug!("Starting federation worker for {}", self.target.domain);
self.inboxes.update_communities(&self.context).await?; self.inboxes.update_communities(&self.context).await?;
self.initial_fail_sleep().await?; self.initial_fail_sleep().await?;
while !self.stop.is_cancelled() { while !self.stop.is_cancelled() {
@ -152,7 +152,7 @@ impl InstanceWorker {
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
else { else {
debug!("{}: {:?} does not exist", self.instance.domain, id); debug!("{}: {:?} does not exist", self.target.domain, id);
self.state.last_successful_id = Some(id); self.state.last_successful_id = Some(id);
continue; continue;
}; };
@ -183,7 +183,7 @@ impl InstanceWorker {
println!("send retry loop {:?}", activity.id); println!("send retry loop {:?}", activity.id);
let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?; let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?;
if inbox_urls.is_empty() { if inbox_urls.is_empty() {
trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); trace!("{}: {:?} no inboxes", self.target.domain, activity.id);
self.state.last_successful_id = Some(activity.id); self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published); self.state.last_successful_published_time = Some(activity.published);
return Ok(()); return Ok(());
@ -209,7 +209,7 @@ impl InstanceWorker {
let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
info!( info!(
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
self.instance.domain, activity.id, self.state.fail_count self.target.domain, activity.id, self.state.fail_count
); );
self.save_and_send_state().await?; self.save_and_send_state().await?;
tokio::select! { tokio::select! {
@ -222,15 +222,15 @@ impl InstanceWorker {
} }
// Activity send successful, mark instance as alive if it hasn't been updated in a while. // Activity send successful, mark instance as alive if it hasn't been updated in a while.
let updated = self.instance.updated.unwrap_or(self.instance.published); let updated = self.target.updated.unwrap_or(self.target.published);
if updated.add(Days::new(1)) < Utc::now() { if updated.add(Days::new(1)) < Utc::now() {
self.instance.updated = Some(Utc::now()); self.target.updated = Some(Utc::now());
let form = InstanceForm::builder() let form = InstanceForm::builder()
.domain(self.instance.domain.clone()) .domain(self.target.domain.clone())
.updated(Some(naive_now())) .updated(Some(naive_now()))
.build(); .build();
Instance::update(&mut self.context.pool(), self.instance.id, form).await?; Instance::update(&mut self.context.pool(), self.target.id, form).await?;
} }
} }
Ok(()) Ok(())
@ -241,7 +241,7 @@ impl InstanceWorker {
FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?;
self self
.stats_sender .stats_sender
.send((self.instance.id, self.state.clone()))?; .send((self.target.id, self.state.clone()))?;
Ok(()) Ok(())
} }
} }