remove else below continue

This commit is contained in:
phiresky 2024-07-15 16:22:33 +02:00
parent 10758ab8ea
commit 373e61969d

View file

@ -116,9 +116,7 @@ impl InstanceWorker {
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit) /// cancelled (graceful exit)
async fn loop_until_stopped( async fn loop_until_stopped(&mut self) -> Result<()> {
&mut self,
) -> Result<()> {
self.initial_fail_sleep().await?; self.initial_fail_sleep().await?;
let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?;
@ -139,56 +137,55 @@ impl InstanceWorker {
// handle_send_results does not guarantee that we are now in a condition where we want to // handle_send_results does not guarantee that we are now in a condition where we want to
// send a new one, so repeat this check until the if no longer applies // send a new one, so repeat this check until the if no longer applies
continue; continue;
} else { }
// send a new activity if there is one
self.inbox_collector.update_communities().await?; // send a new activity if there is one
let next_id_to_send = ActivityId(last_sent_id.0 + 1); self.inbox_collector.update_communities().await?;
{ let next_id_to_send = ActivityId(last_sent_id.0 + 1);
// sanity check: calculate next id to send based on the last id and the in flight requests {
let last_successful_id = self.state.last_successful_id.map(|e| e.0).context( // sanity check: calculate next id to send based on the last id and the in flight requests
let last_successful_id =
self.state.last_successful_id.map(|e| e.0).context(
"impossible: id is initialized in get_latest_ids and never returned to None", "impossible: id is initialized in get_latest_ids and never returned to None",
)?; )?;
let expected_next_id = let expected_next_id =
last_successful_id + (self.successfuls.len() as i64) + self.in_flight + 1; last_successful_id + (self.successfuls.len() as i64) + self.in_flight + 1;
// compare to next id based on incrementing // compare to next id based on incrementing
if expected_next_id != next_id_to_send.0 { if expected_next_id != next_id_to_send.0 {
anyhow::bail!( anyhow::bail!(
"{}: next id to send is not as expected: {:?} != {:?}", "{}: next id to send is not as expected: {:?} != {:?}",
self.instance.domain, self.instance.domain,
expected_next_id, expected_next_id,
next_id_to_send next_id_to_send
) )
}
} }
}
if next_id_to_send > newest_id {
// lazily fetch latest id only if we have cought up
newest_id = self.get_latest_ids().await?.1;
if next_id_to_send > newest_id { if next_id_to_send > newest_id {
// lazily fetch latest id only if we have cought up if next_id_to_send > ActivityId(newest_id.0 + 1) {
newest_id = self.get_latest_ids().await?.1; tracing::error!(
if next_id_to_send > newest_id {
if next_id_to_send > ActivityId(newest_id.0 + 1) {
tracing::error!(
"{}: next send id {} is higher than latest id {}+1 in database (did the db get cleared?)", "{}: next send id {} is higher than latest id {}+1 in database (did the db get cleared?)",
self.instance.domain, self.instance.domain,
next_id_to_send.0, next_id_to_send.0,
newest_id.0 newest_id.0
); );
}
// no more work to be done, wait before rechecking
tokio::select! {
() = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
() = self.stop.cancelled() => {
tracing::debug!("cancelled worker loop while waiting for new work")
}
}
continue;
} }
// no more work to be done, wait before rechecking
tokio::select! {
() = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
() = self.stop.cancelled() => {
tracing::debug!("cancelled worker loop while waiting for new work")
}
}
continue;
} }
self.in_flight += 1;
last_sent_id = next_id_to_send;
self
.spawn_send_if_needed(next_id_to_send)
.await?;
} }
self.in_flight += 1;
last_sent_id = next_id_to_send;
self.spawn_send_if_needed(next_id_to_send).await?;
} }
tracing::debug!("cancelled worker loop after send"); tracing::debug!("cancelled worker loop after send");
@ -327,20 +324,19 @@ impl InstanceWorker {
/// we collect the relevant inboxes in the main instance worker task, and only spawn the send task /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task
/// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many) /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many)
/// cases where we don't have any inboxes /// cases where we don't have any inboxes
async fn spawn_send_if_needed( async fn spawn_send_if_needed(&mut self, activity_id: ActivityId) -> Result<()> {
&mut self,
activity_id: ActivityId,
) -> Result<()> {
let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) let Some(ele) = get_activity_cached(&mut self.pool(), activity_id)
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
else { else {
tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id); tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id);
self.report_send_result.send(SendActivityResult::Success(SendSuccessInfo { self
activity_id, .report_send_result
published: None, .send(SendActivityResult::Success(SendSuccessInfo {
was_skipped: true, activity_id,
}))?; published: None,
was_skipped: true,
}))?;
return Ok(()); return Ok(());
}; };
let activity = &ele.0; let activity = &ele.0;
@ -353,16 +349,19 @@ impl InstanceWorker {
// this is the case when the activity is not relevant to this receiving instance (e.g. no user // this is the case when the activity is not relevant to this receiving instance (e.g. no user
// subscribed to the relevant community) // subscribed to the relevant community)
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id);
self.report_send_result.send(SendActivityResult::Success(SendSuccessInfo { self
activity_id, .report_send_result
// it would be valid here to either return None or Some(activity.published). The published .send(SendActivityResult::Success(SendSuccessInfo {
// time is only used for stats pages that track federation delay. None can be a bit activity_id,
// misleading because if you look at / chart the published time for federation from a large // it would be valid here to either return None or Some(activity.published). The published
// to a small instance that's only subscribed to a few small communities, then it will show // time is only used for stats pages that track federation delay. None can be a bit
// the last published time as a days ago even though federation is up to date. // misleading because if you look at / chart the published time for federation from a
published: Some(activity.published), // large to a small instance that's only subscribed to a few small communities,
was_skipped: true, // then it will show the last published time as a days ago even though
}))?; // federation is up to date.
published: Some(activity.published),
was_skipped: true,
}))?;
return Ok(()); return Ok(());
} }
let initial_fail_count = self.state.fail_count; let initial_fail_count = self.state.fail_count;