refactor tests

This commit is contained in:
Felix Ableitner 2024-05-30 12:36:39 +02:00
parent 17336b9797
commit ca4b735517
2 changed files with 81 additions and 49 deletions

View file

@ -180,6 +180,7 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
/// the domain name is needed for logging, pass it to the stats printer so it doesn't need to look /// the domain name is needed for logging, pass it to the stats printer so it doesn't need to look
/// up the domain itself /// up the domain itself
#[derive(Debug)]
pub(crate) struct FederationQueueStateWithDomain { pub(crate) struct FederationQueueStateWithDomain {
pub domain: String, pub domain: String,
pub state: FederationQueueState, pub state: FederationQueueState,

View file

@ -95,6 +95,7 @@ impl InstanceWorker {
self.inboxes.update_communities(&self.context).await?; self.inboxes.update_communities(&self.context).await?;
} }
// final update of state in db // final update of state in db
dbg!("done");
self.save_and_send_state().await?; self.save_and_send_state().await?;
Ok(()) Ok(())
} }
@ -164,6 +165,7 @@ impl InstanceWorker {
); );
} }
if self.stop.is_cancelled() { if self.stop.is_cancelled() {
dbg!("done");
return Ok(()); return Ok(());
} }
// send success! // send success!
@ -272,77 +274,106 @@ mod test {
use tokio::{ use tokio::{
select, select,
spawn, spawn,
sync::mpsc::{error::TryRecvError, unbounded_channel}, sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
}; };
use url::Url; use url::Url;
struct Data {
context: activitypub_federation::config::Data<LemmyContext>,
instance: Instance,
person: Person,
stats_receiver: UnboundedReceiver<FederationQueueStateWithDomain>,
inbox_receiver: UnboundedReceiver<String>,
cancel: CancellationToken,
}
impl Data {
async fn init() -> LemmyResult<Self> {
let context = LemmyContext::init_test_context().await;
let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?;
let actor_keypair = generate_actor_keypair()?;
let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into();
let person_form = PersonInsertForm::builder()
.name("alice".to_string())
.actor_id(Some(actor_id.clone()))
.private_key(Some(actor_keypair.private_key))
.public_key(actor_keypair.public_key)
.inbox_url(Some(generate_inbox_url(&actor_id)?))
.shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?))
.instance_id(instance.id)
.build();
let person = Person::create(&mut context.pool(), &person_form).await?;
let cancel = CancellationToken::new();
let (stats_sender, stats_receiver) = unbounded_channel();
let (inbox_sender, inbox_receiver) = unbounded_channel();
// listen for received activities in background
let cancel_ = cancel.clone();
std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_)));
spawn(InstanceWorker::init_and_loop(
instance.clone(),
context.reset_request_count(),
cancel.clone(),
stats_sender,
));
// wait for startup
sleep(WORK_FINISHED_RECHECK_DELAY).await;
Ok(Self {
context,
instance,
person,
stats_receiver,
inbox_receiver,
cancel,
})
}
async fn cleanup(&self) -> LemmyResult<()> {
self.cancel.cancel();
sleep(WORK_FINISHED_RECHECK_DELAY).await;
Instance::delete_all(&mut self.context.pool()).await?;
Person::delete(&mut self.context.pool(), self.person.id).await?;
Ok(())
}
}
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn test_worker() -> LemmyResult<()> { async fn test_stats() -> LemmyResult<()> {
let context = LemmyContext::init_test_context().await; let mut data = Data::init().await?;
let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?;
let actor_keypair = generate_actor_keypair()?; // first receive at startup
let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); let rcv = data.stats_receiver.recv().await.unwrap();
let person_form = PersonInsertForm::builder() assert_eq!(data.instance.id, rcv.state.instance_id);
.name("alice".to_string()) assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
.actor_id(Some(actor_id.clone()))
.private_key(Some(actor_keypair.private_key))
.public_key(actor_keypair.public_key)
.inbox_url(Some(generate_inbox_url(&actor_id)?))
.shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?))
.instance_id(instance.id)
.build();
let person = Person::create(&mut context.pool(), &person_form).await?;
let cancel = CancellationToken::new(); let sent = send_activity(data.person.actor_id.clone(), &data.context).await?;
let (stats_sender, mut stats_receiver) = unbounded_channel();
let (inbox_sender, mut inbox_receiver) = unbounded_channel();
// listen for received activities in background
let cancel_ = cancel.clone();
std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_)));
spawn(InstanceWorker::init_and_loop(
instance.clone(),
context.reset_request_count(),
cancel.clone(),
stats_sender,
));
// wait for startup before creating sent activity
sleep(WORK_FINISHED_RECHECK_DELAY).await;
let sent = send_activity(person.actor_id, &context).await?;
sleep(WORK_FINISHED_RECHECK_DELAY * 2).await; sleep(WORK_FINISHED_RECHECK_DELAY * 2).await;
// first receive at startup
let rcv = stats_receiver.recv().await.unwrap();
assert_eq!(instance.id, rcv.state.instance_id);
assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// receive for successfully sent activity // receive for successfully sent activity
let inbox_rcv = inbox_receiver.recv().await.unwrap(); let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?; let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
assert_eq!(&sent.data, parsed_activity.inner()); assert_eq!(&sent.data, parsed_activity.inner());
let rcv = stats_receiver.recv().await.unwrap(); let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(instance.id, rcv.state.instance_id); assert_eq!(data.instance.id, rcv.state.instance_id);
assert_eq!(Some(sent.id), rcv.state.last_successful_id); assert_eq!(Some(sent.id), rcv.state.last_successful_id);
// cleanup data.cleanup().await?;
cancel.cancel();
Instance::delete_all(&mut context.pool()).await?;
Person::delete(&mut context.pool(), person.id).await?;
// it also sends state on shutdown // it also sends state on shutdown
let rcv = stats_receiver.try_recv(); let rcv = data.stats_receiver.try_recv();
assert!(rcv.is_ok()); assert!(rcv.is_ok());
// nothing further received // nothing further received
let rcv = stats_receiver.try_recv(); let rcv = data.stats_receiver.try_recv();
assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
let inbox_rcv = inbox_receiver.try_recv(); let inbox_rcv = data.inbox_receiver.try_recv();
assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err()); assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err());
Ok(()) Ok(())