From d4f26da00e0c0b4711c116e103cbff3c8a9d5b6e Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 23 May 2024 13:00:34 +0200 Subject: [PATCH] add file --- crates/federate/src/stats.rs | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 crates/federate/src/stats.rs diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs new file mode 100644 index 000000000..f6d21b666 --- /dev/null +++ b/crates/federate/src/stats.rs @@ -0,0 +1,63 @@ +use crate::util::get_latest_activity_id; +use chrono::Local; +use lemmy_api_common::federate_retry_sleep_duration; +use lemmy_db_schema::{ + source::federation_queue_state::FederationQueueState, + utils::{ActualDbPool, DbPool}, +}; +use std::{collections::HashMap, time::Duration}; +use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; +use tracing::{debug, error, info}; + +/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) +pub(crate) async fn receive_print_stats( + pool: ActualDbPool, + mut receiver: UnboundedReceiver<(String, FederationQueueState)>, +) { + let pool = &mut DbPool::Pool(&pool); + let mut printerval = interval(Duration::from_secs(60)); + let mut stats = HashMap::new(); + loop { + tokio::select! { + ele = receiver.recv() => { + let Some((domain, ele)) = ele else { + print_stats(pool, &stats).await; + return; + }; + stats.insert(domain, ele); + }, + _ = printerval.tick() => { + print_stats(pool, &stats).await; + } + } + } +} + +async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { + let last_id = get_latest_activity_id(pool).await; + let Ok(last_id) = last_id else { + error!("could not get last id"); + return; + }; + // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date + info!("Federation state as of {}:", Local::now().to_rfc3339()); + // todo: more stats (act/sec, avg http req duration) + let mut ok_count = 0; + let mut behind_count = 0; + for (domain, stat) in stats { + let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); + if stat.fail_count > 0 { + info!( + "{domain}: Warning. {behind} behind, {} consecutive fails, current retry delay {:.2?}", + stat.fail_count, + federate_retry_sleep_duration(stat.fail_count) + ); + } else if behind > 0 { + debug!("{}: Ok. {} activities behind", domain, behind); + behind_count += 1; + } else { + ok_count += 1; + } + } + info!("{ok_count} others up to date. {behind_count} instances behind."); +}