diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index e9a7ab4a4..59b450b5c 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -180,6 +180,7 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result, + instance: Instance, + person: Person, + stats_receiver: UnboundedReceiver, + inbox_receiver: UnboundedReceiver, + cancel: CancellationToken, + } + + impl Data { + async fn init() -> LemmyResult { + let context = LemmyContext::init_test_context().await; + let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?; + + let actor_keypair = generate_actor_keypair()?; + let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); + let person_form = PersonInsertForm::builder() + .name("alice".to_string()) + .actor_id(Some(actor_id.clone())) + .private_key(Some(actor_keypair.private_key)) + .public_key(actor_keypair.public_key) + .inbox_url(Some(generate_inbox_url(&actor_id)?)) + .shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?)) + .instance_id(instance.id) + .build(); + let person = Person::create(&mut context.pool(), &person_form).await?; + + let cancel = CancellationToken::new(); + let (stats_sender, stats_receiver) = unbounded_channel(); + let (inbox_sender, inbox_receiver) = unbounded_channel(); + + // listen for received activities in background + let cancel_ = cancel.clone(); + std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_))); + + spawn(InstanceWorker::init_and_loop( + instance.clone(), + context.reset_request_count(), + cancel.clone(), + stats_sender, + )); + // wait for startup + sleep(WORK_FINISHED_RECHECK_DELAY).await; + + Ok(Self { + context, + instance, + person, + stats_receiver, + inbox_receiver, + cancel, + }) + } + + async fn cleanup(&self) -> LemmyResult<()> { + self.cancel.cancel(); + sleep(WORK_FINISHED_RECHECK_DELAY).await; + Instance::delete_all(&mut self.context.pool()).await?; + Person::delete(&mut self.context.pool(), self.person.id).await?; + Ok(()) + } + } + #[tokio::test] #[serial] - async fn test_worker() -> LemmyResult<()> { - let context = LemmyContext::init_test_context().await; - let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?; + async fn test_stats() -> LemmyResult<()> { + let mut data = Data::init().await?; - let actor_keypair = generate_actor_keypair()?; - let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); - let person_form = PersonInsertForm::builder() - .name("alice".to_string()) - .actor_id(Some(actor_id.clone())) - .private_key(Some(actor_keypair.private_key)) - .public_key(actor_keypair.public_key) - .inbox_url(Some(generate_inbox_url(&actor_id)?)) - .shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?)) - .instance_id(instance.id) - .build(); - let person = Person::create(&mut context.pool(), &person_form).await?; + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(data.instance.id, rcv.state.instance_id); + assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); - let cancel = CancellationToken::new(); - let (stats_sender, mut stats_receiver) = unbounded_channel(); - let (inbox_sender, mut inbox_receiver) = unbounded_channel(); - - // listen for received activities in background - let cancel_ = cancel.clone(); - std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_))); - - spawn(InstanceWorker::init_and_loop( - instance.clone(), - context.reset_request_count(), - cancel.clone(), - stats_sender, - )); - // wait for startup before creating sent activity - sleep(WORK_FINISHED_RECHECK_DELAY).await; - - let sent = send_activity(person.actor_id, &context).await?; + let sent = send_activity(data.person.actor_id.clone(), &data.context).await?; sleep(WORK_FINISHED_RECHECK_DELAY * 2).await; - // first receive at startup - let rcv = stats_receiver.recv().await.unwrap(); - assert_eq!(instance.id, rcv.state.instance_id); - assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); - // receive for successfully sent activity - let inbox_rcv = inbox_receiver.recv().await.unwrap(); + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; assert_eq!(&sent.data, parsed_activity.inner()); - let rcv = stats_receiver.recv().await.unwrap(); - assert_eq!(instance.id, rcv.state.instance_id); + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(data.instance.id, rcv.state.instance_id); assert_eq!(Some(sent.id), rcv.state.last_successful_id); - // cleanup - cancel.cancel(); - Instance::delete_all(&mut context.pool()).await?; - Person::delete(&mut context.pool(), person.id).await?; + data.cleanup().await?; // it also sends state on shutdown - let rcv = stats_receiver.try_recv(); + let rcv = data.stats_receiver.try_recv(); assert!(rcv.is_ok()); // nothing further received - let rcv = stats_receiver.try_recv(); + let rcv = data.stats_receiver.try_recv(); assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); - let inbox_rcv = inbox_receiver.try_recv(); + let inbox_rcv = data.inbox_receiver.try_recv(); assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err()); Ok(())