From 6b46a7053535e3841ef5dcb5e8d2e80cbff976f8 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 21 May 2024 20:47:06 +0200 Subject: [PATCH] Extra logging to debug duplicate activities (ref #4609) (#4726) * Extra logging to debug duplicate activities (ref #4609) * Fix logging for api tests * fmt --- api_tests/prepare-drone-federation-test.sh | 6 +++--- crates/federate/src/lib.rs | 5 +++++ crates/federate/src/worker.rs | 19 +++++++++---------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index 147a553ab2..136f8ddfc6 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -3,13 +3,13 @@ # it is expected that this script is called by run-federation-test.sh script. set -e -if [ -n "$LEMMY_LOG_LEVEL" ]; +if [ -z "$LEMMY_LOG_LEVEL" ]; then - LEMMY_LOG_LEVEL=warn + LEMMY_LOG_LEVEL=info fi export RUST_BACKTRACE=1 -#export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL" +export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL" export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc495360..fc5bd2387b 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -13,6 +13,7 @@ use tokio::{ time::sleep, }; use tokio_util::sync::CancellationToken; +use tracing::info; mod util; mod worker; @@ -44,6 +45,10 @@ async fn start_stop_federation_workers( let pool2 = &mut DbPool::Pool(&pool); let process_index = opts.process_index - 1; let local_domain = federation_config.settings().get_hostname_without_port()?; + info!( + "Starting federation workers for process count {} and index {}", + opts.process_count, process_index + ); loop { let mut total_count = 0; let mut dead_count = 0; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ff2a68e3c4..f6701a8d10 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -34,6 +34,7 @@ use std::{ }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; +use tracing::{debug, info, trace, warn}; /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. @@ -105,6 +106,7 @@ impl InstanceWorker { &mut self, pool: &mut DbPool<'_>, ) -> Result<(), anyhow::Error> { + debug!("Starting federation worker for {}", self.instance.domain); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); self.update_communities(pool).await?; @@ -176,15 +178,14 @@ impl InstanceWorker { .await .context("failed reading activity from db")? else { - tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); + debug!("{}: {:?} does not exist", self.instance.domain, id); self.state.last_successful_id = Some(id); continue; }; if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { - tracing::warn!( + warn!( "sending {} errored internally, skipping activity: {:?}", - ele.0.ap_id, - e + ele.0.ap_id, e ); } if self.stop.is_cancelled() { @@ -211,7 +212,7 @@ impl InstanceWorker { .await .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { - tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); self.state.last_successful_id = Some(activity.id); self.state.last_successful_published_time = Some(activity.published); return Ok(()); @@ -229,16 +230,14 @@ impl InstanceWorker { SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; for task in requests { // usually only one due to shared inbox - tracing::debug!("sending out {}", task); + trace!("sending out {}", task); while let Err(e) = task.sign_and_send(&self.context).await { self.state.fail_count += 1; self.state.last_retry = Some(Utc::now()); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); - tracing::info!( + info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, - activity.id, - self.state.fail_count + self.instance.domain, activity.id, self.state.fail_count ); self.save_and_send_state(pool).await?; tokio::select! {