allow out of order receives in test

This commit is contained in:
phiresky 2024-07-11 15:33:46 +02:00
parent dd9c89e4ef
commit bef7190b56

View file

@ -421,6 +421,7 @@ mod test {
use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::DbUrl, newtypes::DbUrl,
schema::received_activity,
source::{ source::{
activity::{ActorType, SentActivity, SentActivityForm}, activity::{ActorType, SentActivity, SentActivityForm},
person::{Person, PersonInsertForm}, person::{Person, PersonInsertForm},
@ -431,7 +432,6 @@ mod test {
use reqwest::StatusCode; use reqwest::StatusCode;
use serde_json::{json, Value}; use serde_json::{json, Value};
use serial_test::serial; use serial_test::serial;
use std::{fs::File, io::BufReader};
use test_context::{test_context, AsyncTestContext}; use test_context::{test_context, AsyncTestContext};
use tokio::{ use tokio::{
spawn, spawn,
@ -449,6 +449,7 @@ mod test {
cancel: CancellationToken, cancel: CancellationToken,
cleaned_up: bool, cleaned_up: bool,
wait_stop_server: ServerHandle, wait_stop_server: ServerHandle,
is_concurrent: bool,
} }
impl Data { impl Data {
@ -477,7 +478,7 @@ mod test {
let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS")
.ok() .ok()
.and_then(|s| s.parse().ok()) .and_then(|s| s.parse().ok())
.unwrap_or(1); .unwrap_or(10);
let fed_config = FederationWorkerConfig { let fed_config = FederationWorkerConfig {
concurrent_sends_per_instance, concurrent_sends_per_instance,
@ -501,6 +502,7 @@ mod test {
cancel, cancel,
wait_stop_server, wait_stop_server,
cleaned_up: false, cleaned_up: false,
is_concurrent: concurrent_sends_per_instance > 1,
}) })
} }
@ -590,13 +592,7 @@ mod test {
} }
sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
tracing::debug!("sent activity"); tracing::debug!("sent activity");
// receive for successfully sent activity compare_sent_with_receive(data, sent).await?;
for i in 0..100 {
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
assert_eq!(&sent[i].data, parsed_activity.inner());
tracing::debug!("received activity");
}
Ok(()) Ok(())
} }
@ -625,13 +621,7 @@ mod test {
} }
sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
tracing::debug!("sent activity"); tracing::debug!("sent activity");
// receive for successfully sent activity compare_sent_with_receive(data, sent).await?;
for i in 0..count {
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
assert_eq!(&sent[i].data, parsed_activity.inner());
tracing::debug!("received activity");
}
} }
Ok(()) Ok(())
@ -723,4 +713,36 @@ mod test {
Ok(sent) Ok(sent)
} }
async fn compare_sent_with_receive(data: &mut Data, mut sent: Vec<SentActivity>) -> Result<()> {
let check_order = !data.is_concurrent; // allow out-of order receiving when running parallel
let mut received = Vec::new();
for _ in 0..sent.len() {
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
received.push(parsed_activity);
}
if !check_order {
// sort by id
received.sort_by(|a, b| {
a.inner()["id"]
.as_str()
.unwrap()
.cmp(b.inner()["id"].as_str().unwrap())
});
sent.sort_by(|a, b| {
a.data["id"]
.as_str()
.unwrap()
.cmp(&b.data["id"].as_str().unwrap())
});
}
// receive for successfully sent activity
for i in 0..sent.len() {
let sent_activity = &sent[i];
let received_activity = received[i].inner();
assert_eq!(&sent_activity.data, received_activity);
tracing::debug!("received activity");
}
Ok(())
}
} }