add test to federate 100 events

This commit is contained in:
phiresky 2024-07-09 23:58:08 +02:00
parent c2dab71377
commit 40b3c1a24d

View file

@ -499,6 +499,14 @@ mod test {
self.wait_stop_server.stop(true).await; self.wait_stop_server.stop(true).await;
Ok(()) Ok(())
} }
async fn recv_last_stats(&mut self) -> LemmyResult<FederationQueueStateWithDomain> {
let mut last = self.stats_receiver.try_recv();
while let Ok(next) = self.stats_receiver.try_recv() {
last = Ok(next);
}
Ok(last?)
}
} }
/// In order to guarantee that the webserver is stopped via the cleanup function, /// In order to guarantee that the webserver is stopped via the cleanup function,
/// we implement a test context. /// we implement a test context.
@ -525,7 +533,7 @@ mod test {
// assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// let last_id_before = rcv.state.last_successful_id.unwrap(); // let last_id_before = rcv.state.last_successful_id.unwrap();
let sent = send_activity(data.person.actor_id.clone(), &data.context).await?; let sent = send_activity(data.person.actor_id.clone(), &data.context, true).await?;
tracing::debug!("sent activity"); tracing::debug!("sent activity");
// receive for successfully sent activity // receive for successfully sent activity
let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
@ -552,6 +560,52 @@ mod test {
Ok(()) Ok(())
} }
#[test_context(Data)]
#[tokio::test]
#[traced_test]
#[serial]
async fn test_send_100(data: &mut Data) -> LemmyResult<()> {
tracing::debug!("hello world");
// first receive at startup
let rcv = data.stats_receiver.recv().await.unwrap();
tracing::debug!("received first stats");
assert_eq!(data.instance.id, rcv.state.instance_id);
// assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// let last_id_before = rcv.state.last_successful_id.unwrap();
let mut sent = Vec::new();
for _ in 0..100 {
sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?);
sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
}
tracing::debug!("sent activity");
// receive for successfully sent activity
for i in 0..100 {
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
assert_eq!(&sent[i].data, parsed_activity.inner());
tracing::debug!("received activity");
}
let rcv = data.recv_last_stats().await.unwrap();
assert_eq!(data.instance.id, rcv.state.instance_id);
assert_eq!(Some(sent.last().unwrap().id), rcv.state.last_successful_id);
data.cleanup().await?;
// it also sends state on shutdown
let rcv = data.stats_receiver.try_recv();
assert!(rcv.is_ok());
// nothing further received
let rcv = data.stats_receiver.try_recv();
assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
let inbox_rcv = data.inbox_receiver.try_recv();
assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err());
Ok(())
}
#[test_context(Data)] #[test_context(Data)]
#[tokio::test] #[tokio::test]
#[serial] #[serial]
@ -562,7 +616,7 @@ mod test {
.build(); .build();
Instance::update(&mut data.context.pool(), data.instance.id, form).await?; Instance::update(&mut data.context.pool(), data.instance.id, form).await?;
send_activity(data.person.actor_id.clone(), &data.context).await?; send_activity(data.person.actor_id.clone(), &data.context, true).await?;
data.inbox_receiver.recv().await.unwrap(); data.inbox_receiver.recv().await.unwrap();
let instance = let instance =
@ -603,7 +657,11 @@ mod test {
Ok(handle) Ok(handle)
} }
async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult<SentActivity> { async fn send_activity(
actor_id: DbUrl,
context: &LemmyContext,
wait: bool,
) -> LemmyResult<SentActivity> {
// create outgoing activity // create outgoing activity
let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?; let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?;
let reader = BufReader::new(file); let reader = BufReader::new(file);
@ -623,7 +681,9 @@ mod test {
}; };
let sent = SentActivity::create(&mut context.pool(), form).await?; let sent = SentActivity::create(&mut context.pool(), form).await?;
sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await; if wait {
sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await;
}
Ok(sent) Ok(sent)
} }