diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 79e19dd40..dd307fc7d 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -421,6 +421,7 @@ mod test { use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; use lemmy_db_schema::{ newtypes::DbUrl, + schema::received_activity, source::{ activity::{ActorType, SentActivity, SentActivityForm}, person::{Person, PersonInsertForm}, @@ -431,7 +432,6 @@ mod test { use reqwest::StatusCode; use serde_json::{json, Value}; use serial_test::serial; - use std::{fs::File, io::BufReader}; use test_context::{test_context, AsyncTestContext}; use tokio::{ spawn, @@ -449,6 +449,7 @@ mod test { cancel: CancellationToken, cleaned_up: bool, wait_stop_server: ServerHandle, + is_concurrent: bool, } impl Data { @@ -477,7 +478,7 @@ mod test { let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(1); + .unwrap_or(10); let fed_config = FederationWorkerConfig { concurrent_sends_per_instance, @@ -501,6 +502,7 @@ mod test { cancel, wait_stop_server, cleaned_up: false, + is_concurrent: concurrent_sends_per_instance > 1, }) } @@ -590,13 +592,7 @@ mod test { } sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; tracing::debug!("sent activity"); - // receive for successfully sent activity - for i in 0..100 { - let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); - let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; - assert_eq!(&sent[i].data, parsed_activity.inner()); - tracing::debug!("received activity"); - } + compare_sent_with_receive(data, sent).await?; Ok(()) } @@ -625,13 +621,7 @@ mod test { } sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; tracing::debug!("sent activity"); - // receive for successfully sent activity - for i in 0..count { - let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); - let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; - assert_eq!(&sent[i].data, parsed_activity.inner()); - tracing::debug!("received activity"); - } + compare_sent_with_receive(data, sent).await?; } Ok(()) @@ -723,4 +713,36 @@ mod test { Ok(sent) } + async fn compare_sent_with_receive(data: &mut Data, mut sent: Vec) -> Result<()> { + let check_order = !data.is_concurrent; // allow out-of order receiving when running parallel + let mut received = Vec::new(); + for _ in 0..sent.len() { + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + received.push(parsed_activity); + } + if !check_order { + // sort by id + received.sort_by(|a, b| { + a.inner()["id"] + .as_str() + .unwrap() + .cmp(b.inner()["id"].as_str().unwrap()) + }); + sent.sort_by(|a, b| { + a.data["id"] + .as_str() + .unwrap() + .cmp(&b.data["id"].as_str().unwrap()) + }); + } + // receive for successfully sent activity + for i in 0..sent.len() { + let sent_activity = &sent[i]; + let received_activity = received[i].inner(); + assert_eq!(&sent_activity.data, received_activity); + tracing::debug!("received activity"); + } + Ok(()) + } }