diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index d52ca2ce3..0f4892da4 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -499,6 +499,14 @@ mod test { self.wait_stop_server.stop(true).await; Ok(()) } + + async fn recv_last_stats(&mut self) -> LemmyResult { + let mut last = self.stats_receiver.try_recv(); + while let Ok(next) = self.stats_receiver.try_recv() { + last = Ok(next); + } + Ok(last?) + } } /// In order to guarantee that the webserver is stopped via the cleanup function, /// we implement a test context. @@ -525,7 +533,7 @@ mod test { // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); // let last_id_before = rcv.state.last_successful_id.unwrap(); - let sent = send_activity(data.person.actor_id.clone(), &data.context).await?; + let sent = send_activity(data.person.actor_id.clone(), &data.context, true).await?; tracing::debug!("sent activity"); // receive for successfully sent activity let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); @@ -552,6 +560,52 @@ mod test { Ok(()) } + #[test_context(Data)] + #[tokio::test] + #[traced_test] + #[serial] + async fn test_send_100(data: &mut Data) -> LemmyResult<()> { + tracing::debug!("hello world"); + + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + tracing::debug!("received first stats"); + assert_eq!(data.instance.id, rcv.state.instance_id); + // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + // let last_id_before = rcv.state.last_successful_id.unwrap(); + let mut sent = Vec::new(); + for _ in 0..100 { + sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?); + sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; + } + tracing::debug!("sent activity"); + // receive for successfully sent activity + for i in 0..100 { + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + assert_eq!(&sent[i].data, parsed_activity.inner()); + tracing::debug!("received activity"); + } + + let rcv = data.recv_last_stats().await.unwrap(); + assert_eq!(data.instance.id, rcv.state.instance_id); + assert_eq!(Some(sent.last().unwrap().id), rcv.state.last_successful_id); + + data.cleanup().await?; + + // it also sends state on shutdown + let rcv = data.stats_receiver.try_recv(); + assert!(rcv.is_ok()); + + // nothing further received + let rcv = data.stats_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); + let inbox_rcv = data.inbox_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err()); + + Ok(()) + } + #[test_context(Data)] #[tokio::test] #[serial] @@ -562,7 +616,7 @@ mod test { .build(); Instance::update(&mut data.context.pool(), data.instance.id, form).await?; - send_activity(data.person.actor_id.clone(), &data.context).await?; + send_activity(data.person.actor_id.clone(), &data.context, true).await?; data.inbox_receiver.recv().await.unwrap(); let instance = @@ -603,7 +657,11 @@ mod test { Ok(handle) } - async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult { + async fn send_activity( + actor_id: DbUrl, + context: &LemmyContext, + wait: bool, + ) -> LemmyResult { // create outgoing activity let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?; let reader = BufReader::new(file); @@ -623,7 +681,9 @@ mod test { }; let sent = SentActivity::create(&mut context.pool(), form).await?; - sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await; + if wait { + sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await; + } Ok(sent) }