diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index d3876226f..0e87b12de 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,10 +1,7 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{ - newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, -}; +use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use stats::receive_print_stats; use std::{collections::HashMap, time::Duration}; @@ -15,6 +12,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::info; +use util::FederationQueueStateWithDomain; mod stats; mod util; @@ -38,7 +36,7 @@ pub struct SendManager { opts: Opts, workers: HashMap, context: FederationConfig, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, exit_print: JoinHandle<()>, } @@ -171,7 +169,7 @@ mod test { collections::HashSet, sync::{Arc, Mutex}, }; - use tokio::{spawn, time::sleep}; + use tokio::spawn; struct TestData { send_manager: SendManager, diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs index bb6510263..f927f6ddf 100644 --- a/crates/federate/src/stats.rs +++ b/crates/federate/src/stats.rs @@ -1,15 +1,11 @@ -use crate::util::get_latest_activity_id; +use crate::util::{get_latest_activity_id, FederationQueueStateWithDomain}; use chrono::Local; -use diesel::result::Error::NotFound; use lemmy_api_common::federate_retry_sleep_duration; use lemmy_db_schema::{ newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; -use lemmy_utils::{error::LemmyResult, CACHE_DURATION_FEDERATION}; -use moka::future::Cache; -use once_cell::sync::Lazy; +use lemmy_utils::error::LemmyResult; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; use tracing::{debug, info, warn}; @@ -18,7 +14,7 @@ use tracing::{debug, info, warn}; /// dropped) pub(crate) async fn receive_print_stats( pool: ActualDbPool, - mut receiver: UnboundedReceiver<(InstanceId, FederationQueueState)>, + mut receiver: UnboundedReceiver, ) { let pool = &mut DbPool::Pool(&pool); let mut printerval = interval(Duration::from_secs(60)); @@ -28,7 +24,7 @@ pub(crate) async fn receive_print_stats( ele = receiver.recv() => { match ele { // update stats for instance - Some((instance_id, ele)) => {stats.insert(instance_id, ele);}, + Some(ele) => {stats.insert(ele.state.instance_id, ele);}, // receiver closed, print stats and exit None => { print_stats(pool, &stats).await; @@ -43,7 +39,10 @@ pub(crate) async fn receive_print_stats( } } -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { +async fn print_stats( + pool: &mut DbPool<'_>, + stats: &HashMap, +) { let res = print_stats_with_error(pool, stats).await; if let Err(e) = res { warn!("Failed to print stats: {e}"); @@ -52,18 +51,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap, - stats: &HashMap, + stats: &HashMap, ) -> LemmyResult<()> { - static INSTANCE_CACHE: Lazy>> = Lazy::new(|| { - Cache::builder() - .max_capacity(1) - .time_to_live(CACHE_DURATION_FEDERATION) - .build() - }); - let instances = INSTANCE_CACHE - .try_get_with((), async { Instance::read_all(pool).await }) - .await?; - let last_id = get_latest_activity_id(pool).await?; // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be @@ -72,12 +61,9 @@ async fn print_stats_with_error( // todo: more stats (act/sec, avg http req duration) let mut ok_count = 0; let mut behind_count = 0; - for (instance_id, stat) in stats { - let domain = &instances - .iter() - .find(|i| &i.id == instance_id) - .ok_or(NotFound)? - .domain; + for ele in stats.values() { + let stat = &ele.state; + let domain = &ele.domain; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); if stat.fail_count > 0 { info!( diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 818e5b072..b3d080385 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -11,6 +11,7 @@ use lemmy_db_schema::{ source::{ activity::{ActorType, SentActivity}, community::Community, + federation_queue_state::FederationQueueState, person::Person, site::Site, }, @@ -183,3 +184,10 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result>, stop: CancellationToken, context: Data, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, @@ -87,7 +88,7 @@ impl InstanceWorker { instance: Instance, context: Data, stop: CancellationToken, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; @@ -350,9 +351,10 @@ impl InstanceWorker { async fn save_and_send_state(&mut self) -> Result<()> { self.last_state_insert = Utc::now(); FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; - self - .stats_sender - .send((self.instance.id, self.state.clone()))?; + self.stats_sender.send(FederationQueueStateWithDomain { + state: self.state.clone(), + domain: self.instance.domain.clone(), + })?; Ok(()) } }