diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 6355d40f2..d3876226f 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -38,7 +38,7 @@ pub struct SendManager { opts: Opts, workers: HashMap, context: FederationConfig, - stats_sender: UnboundedSender<(String, FederationQueueState)>, + stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, exit_print: JoinHandle<()>, } diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs index a813684b3..bb6510263 100644 --- a/crates/federate/src/stats.rs +++ b/crates/federate/src/stats.rs @@ -1,19 +1,24 @@ use crate::util::get_latest_activity_id; use chrono::Local; +use diesel::result::Error::NotFound; use lemmy_api_common::federate_retry_sleep_duration; use lemmy_db_schema::{ - source::federation_queue_state::FederationQueueState, + newtypes::InstanceId, + source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; +use lemmy_utils::{error::LemmyResult, CACHE_DURATION_FEDERATION}; +use moka::future::Cache; +use once_cell::sync::Lazy; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; -use tracing::{debug, error, info}; +use tracing::{debug, info, warn}; /// every 60s, print the state for every instance. exits if the receiver is done (all senders /// dropped) pub(crate) async fn receive_print_stats( pool: ActualDbPool, - mut receiver: UnboundedReceiver<(String, FederationQueueState)>, + mut receiver: UnboundedReceiver<(InstanceId, FederationQueueState)>, ) { let pool = &mut DbPool::Pool(&pool); let mut printerval = interval(Duration::from_secs(60)); @@ -21,11 +26,15 @@ pub(crate) async fn receive_print_stats( loop { tokio::select! { ele = receiver.recv() => { - let Some((domain, ele)) = ele else { - print_stats(pool, &stats).await; - return; - }; - stats.insert(domain, ele); + match ele { + // update stats for instance + Some((instance_id, ele)) => {stats.insert(instance_id, ele);}, + // receiver closed, print stats and exit + None => { + print_stats(pool, &stats).await; + return; + } + } }, _ = printerval.tick() => { print_stats(pool, &stats).await; @@ -34,19 +43,41 @@ pub(crate) async fn receive_print_stats( } } -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { - let last_id = get_latest_activity_id(pool).await; - let Ok(last_id) = last_id else { - error!("could not get last id"); - return; - }; +async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { + let res = print_stats_with_error(pool, stats).await; + if let Err(e) = res { + warn!("Failed to print stats: {e}"); + } +} + +async fn print_stats_with_error( + pool: &mut DbPool<'_>, + stats: &HashMap, +) -> LemmyResult<()> { + static INSTANCE_CACHE: Lazy>> = Lazy::new(|| { + Cache::builder() + .max_capacity(1) + .time_to_live(CACHE_DURATION_FEDERATION) + .build() + }); + let instances = INSTANCE_CACHE + .try_get_with((), async { Instance::read_all(pool).await }) + .await?; + + let last_id = get_latest_activity_id(pool).await?; + // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be // considered up to date info!("Federation state as of {}:", Local::now().to_rfc3339()); // todo: more stats (act/sec, avg http req duration) let mut ok_count = 0; let mut behind_count = 0; - for (domain, stat) in stats { + for (instance_id, stat) in stats { + let domain = &instances + .iter() + .find(|i| &i.id == instance_id) + .ok_or(NotFound)? + .domain; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); if stat.fail_count > 0 { info!( @@ -62,4 +93,5 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap>, stop: CancellationToken, context: Data, - stats_sender: UnboundedSender<(String, FederationQueueState)>, + stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, @@ -87,7 +87,7 @@ impl InstanceWorker { instance: Instance, context: Data, stop: CancellationToken, - stats_sender: UnboundedSender<(String, FederationQueueState)>, + stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, ) -> Result<(), anyhow::Error> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; @@ -350,7 +350,7 @@ impl InstanceWorker { FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; self .stats_sender - .send((self.instance.domain.clone(), self.state.clone()))?; + .send((self.instance.id, self.state.clone()))?; Ok(()) } }