From 76c648739077f825d22c9380538c9206589151ed Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 29 Jun 2024 19:21:15 +0200 Subject: [PATCH] federation tests: ensure server stop after test and random activity id --- Cargo.lock | 23 +++++++++++++ crates/federate/Cargo.toml | 2 ++ crates/federate/src/util.rs | 3 +- crates/federate/src/worker.rs | 61 +++++++++++++++++++++++++---------- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ae9cde90..3ba344888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3129,11 +3129,13 @@ dependencies = [ "reqwest 0.11.27", "serde_json", "serial_test", + "test-context", "tokio", "tokio-util", "tracing", "tracing-test", "url", + "uuid", ] [[package]] @@ -5716,6 +5718,27 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "test-context" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9" +dependencies = [ + "futures", + "test-context-macros", +] + +[[package]] +name = "test-context-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "thiserror" version = "1.0.61" diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index e63dea443..59c241e9c 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -40,3 +40,5 @@ serial_test = { workspace = true } url.workspace = true actix-web.workspace = true tracing-test = "0.2.5" +uuid.workspace = true +test-context = "0.3.0" \ No newline at end of file diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 60361c3c9..4013ef1d0 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -28,7 +28,8 @@ use tokio_util::sync::CancellationToken; /// Decrease the delays of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the -/// federation queue. +/// federation queue. This is intentionally a separate flag from other flags like debug_assertions, +/// since this is a invasive change we only need rarely. pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { std::env::var("LEMMY_TEST_FAST_FEDERATION") .map(|s| !s.is_empty()) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 13c945525..6d06768bc 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -405,7 +405,7 @@ mod test { http_signatures::generate_actor_keypair, protocol::context::WithContext, }; - use actix_web::{web, App, HttpResponse, HttpServer}; + use actix_web::{dev::ServerHandle, web, App, HttpResponse, HttpServer}; use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; use lemmy_db_schema::{ newtypes::DbUrl, @@ -420,10 +420,12 @@ mod test { use serde_json::Value; use serial_test::serial; use std::{fs::File, io::BufReader}; + use test_context::{test_context, AsyncTestContext}; use tokio::{ select, spawn, sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, + task::JoinHandle, }; use tracing_test::traced_test; use url::Url; @@ -435,6 +437,8 @@ mod test { stats_receiver: UnboundedReceiver, inbox_receiver: UnboundedReceiver, cancel: CancellationToken, + cleaned_up: bool, + wait_stop_server: ServerHandle, } impl Data { @@ -459,7 +463,7 @@ mod test { // listen for received activities in background let cancel_ = cancel.clone(); - listen_activities(inbox_sender, cancel_).await?; + let wait_stop_server = listen_activities(inbox_sender, cancel_)?; let fed_config = FederationWorkerConfig { concurrent_sends_per_instance: 1, @@ -481,30 +485,48 @@ mod test { stats_receiver, inbox_receiver, cancel, + wait_stop_server, + cleaned_up: false, }) } - async fn cleanup(&self) -> LemmyResult<()> { + async fn cleanup(&mut self) -> LemmyResult<()> { + if self.cleaned_up { + return Ok(()); + } + self.cleaned_up = true; self.cancel.cancel(); sleep(*WORK_FINISHED_RECHECK_DELAY).await; Instance::delete_all(&mut self.context.pool()).await?; Person::delete(&mut self.context.pool(), self.person.id).await?; + self.wait_stop_server.stop(true).await; Ok(()) } } + /// In order to guarantee that the webserver is stopped via the cleanup function, + /// we implement a test context. + impl AsyncTestContext for Data { + async fn setup() -> Data { + Data::init().await.unwrap() + } + async fn teardown(mut self) { + self.cleanup().await.unwrap() + } + } + #[test_context(Data)] #[tokio::test] #[traced_test] #[serial] - async fn test_stats() -> LemmyResult<()> { - let mut data = Data::init().await?; + async fn test_stats(data: &mut Data) -> LemmyResult<()> { tracing::debug!("hello world"); // first receive at startup let rcv = data.stats_receiver.recv().await.unwrap(); tracing::debug!("received first stats"); assert_eq!(data.instance.id, rcv.state.instance_id); - assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + // let last_id_before = rcv.state.last_successful_id.unwrap(); let sent = send_activity(data.person.actor_id.clone(), &data.context).await?; tracing::debug!("sent activity"); @@ -528,16 +550,15 @@ mod test { let rcv = data.stats_receiver.try_recv(); assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); let inbox_rcv = data.inbox_receiver.try_recv(); - assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err()); + assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err()); Ok(()) } + #[test_context(Data)] #[tokio::test] #[serial] - async fn test_update_instance() -> LemmyResult<()> { - let mut data = Data::init().await?; - + async fn test_update_instance(data: &mut Data) -> LemmyResult<()> { let form = InstanceForm::builder() .domain(data.instance.domain.clone()) .updated(None) @@ -557,10 +578,10 @@ mod test { Ok(()) } - async fn listen_activities( + fn listen_activities( inbox_sender: UnboundedSender, cancel: CancellationToken, - ) -> LemmyResult<()> { + ) -> LemmyResult { let run = HttpServer::new(move || { App::new() .app_data(actix_web::web::Data::new(inbox_sender.clone())) @@ -577,13 +598,15 @@ mod test { }) .bind(("127.0.0.1", 8085))? .run(); + let handle = run.handle(); tokio::spawn(async move { - select! { + run.await.unwrap(); + /*select! { _ = run => {}, - _ = cancel.cancelled() => {} - } + _ = cancel.cancelled() => { } + }*/ }); - Ok(()) + Ok(handle) } async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult { @@ -591,7 +614,11 @@ mod test { let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?; let reader = BufReader::new(file); let form = SentActivityForm { - ap_id: Url::parse("http://local.com/activity/1")?.into(), + ap_id: Url::parse(&format!( + "http://local.com/activity/{}", + uuid::Uuid::new_v4().to_string() + ))? + .into(), data: serde_json::from_reader(reader)?, sensitive: false, send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],