diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 0f4892da4..dc15be54d 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -150,7 +150,9 @@ impl InstanceWorker { // no more work to be done, wait before rechecking tokio::select! { () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop while waiting for new work") + } } continue; } @@ -162,6 +164,8 @@ impl InstanceWorker { .await?; } } + tracing::debug!("cancelled worker loop after send"); + // final update of state in db on shutdown self.save_and_send_state().await?; Ok(()) @@ -187,7 +191,9 @@ impl InstanceWorker { ); tokio::select! { () = sleep(remaining) => {}, - () = self.stop.cancelled() => {} + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop during initial fail sleep") + } } } Ok(()) @@ -462,8 +468,13 @@ mod test { // listen for received activities in background let wait_stop_server = listen_activities(inbox_sender)?; + let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + let fed_config = FederationWorkerConfig { - concurrent_sends_per_instance: 1, + concurrent_sends_per_instance }; spawn(InstanceWorker::init_and_loop( instance.clone(), @@ -499,15 +510,8 @@ mod test { self.wait_stop_server.stop(true).await; Ok(()) } - - async fn recv_last_stats(&mut self) -> LemmyResult { - let mut last = self.stats_receiver.try_recv(); - while let Ok(next) = self.stats_receiver.try_recv() { - last = Ok(next); - } - Ok(last?) - } } + /// In order to guarantee that the webserver is stopped via the cleanup function, /// we implement a test context. impl AsyncTestContext for Data { @@ -576,8 +580,8 @@ mod test { let mut sent = Vec::new(); for _ in 0..100 { sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?); - sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; } + sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; tracing::debug!("sent activity"); // receive for successfully sent activity for i in 0..100 { @@ -587,22 +591,6 @@ mod test { tracing::debug!("received activity"); } - let rcv = data.recv_last_stats().await.unwrap(); - assert_eq!(data.instance.id, rcv.state.instance_id); - assert_eq!(Some(sent.last().unwrap().id), rcv.state.last_successful_id); - - data.cleanup().await?; - - // it also sends state on shutdown - let rcv = data.stats_receiver.try_recv(); - assert!(rcv.is_ok()); - - // nothing further received - let rcv = data.stats_receiver.try_recv(); - assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); - let inbox_rcv = data.inbox_receiver.try_recv(); - assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err()); - Ok(()) }