mirror of
https://github.com/LemmyNet/lemmy.git
synced 2025-01-13 13:35:54 +00:00
basic test for federation worker
This commit is contained in:
parent
43bc4c4536
commit
53f79a9174
9 changed files with 168 additions and 57 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -2977,6 +2977,7 @@ name = "lemmy_federate"
|
||||||
version = "0.19.4-rc.3"
|
version = "0.19.4-rc.3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
|
"actix-web",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"diesel",
|
"diesel",
|
||||||
|
@ -2995,6 +2996,7 @@ dependencies = [
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -11,8 +11,6 @@ fi
|
||||||
export RUST_BACKTRACE=1
|
export RUST_BACKTRACE=1
|
||||||
export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL"
|
export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL"
|
||||||
|
|
||||||
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
|
|
||||||
|
|
||||||
# pictrs setup
|
# pictrs setup
|
||||||
if [ ! -f "api_tests/pict-rs" ]; then
|
if [ ! -f "api_tests/pict-rs" ]; then
|
||||||
curl "https://git.asonix.dog/asonix/pict-rs/releases/download/v0.5.13/pict-rs-linux-amd64" -o api_tests/pict-rs
|
curl "https://git.asonix.dog/asonix/pict-rs/releases/download/v0.5.13/pict-rs-linux-amd64" -o api_tests/pict-rs
|
||||||
|
|
|
@ -75,6 +75,7 @@ impl LemmyContext {
|
||||||
.app_data(context)
|
.app_data(context)
|
||||||
// Dont allow any network fetches
|
// Dont allow any network fetches
|
||||||
.http_fetch_limit(0)
|
.http_fetch_limit(0)
|
||||||
|
.debug(true)
|
||||||
.build()
|
.build()
|
||||||
.await
|
.await
|
||||||
.expect("build federation config");
|
.expect("build federation config");
|
||||||
|
|
|
@ -37,3 +37,5 @@ tokio-util = "0.7.11"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = { workspace = true }
|
serial_test = { workspace = true }
|
||||||
|
url.workspace = true
|
||||||
|
actix-web.workspace = true
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use crate::util::LEMMY_TEST_FAST_FEDERATION;
|
use chrono::{DateTime, TimeDelta, TimeZone, Utc};
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
|
||||||
use lemmy_api_common::context::LemmyContext;
|
use lemmy_api_common::context::LemmyContext;
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
newtypes::CommunityId,
|
newtypes::CommunityId,
|
||||||
|
@ -20,11 +19,11 @@ use std::collections::{HashMap, HashSet};
|
||||||
/// currently fairly high because of the current structure of storing inboxes for every person, not
|
/// currently fairly high because of the current structure of storing inboxes for every person, not
|
||||||
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
|
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
|
||||||
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
|
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
|
||||||
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
|
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<TimeDelta> = Lazy::new(|| {
|
||||||
if *LEMMY_TEST_FAST_FEDERATION {
|
if cfg!(debug_assertions) {
|
||||||
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
|
TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
|
||||||
} else {
|
} else {
|
||||||
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
|
TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -92,9 +91,11 @@ impl CommunityInboxCollector {
|
||||||
.send_inboxes
|
.send_inboxes
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(std::option::Option::as_ref)
|
.filter_map(std::option::Option::as_ref)
|
||||||
.filter(|&u| (u.domain() == Some(&self.instance.domain)))
|
|
||||||
.map(|u| u.inner().clone()),
|
.map(|u| u.inner().clone()),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// TODO: also needs to send to user followers
|
||||||
|
|
||||||
Ok(inbox_urls)
|
Ok(inbox_urls)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,28 +27,16 @@ use tokio::{task::JoinHandle, time::sleep};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
|
||||||
/// Decrease the delays of the federation queue.
|
|
||||||
/// Should only be used for federation tests since it significantly increases CPU and DB load of the
|
|
||||||
/// federation queue.
|
|
||||||
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
|
|
||||||
std::env::var("LEMMY_TEST_FAST_FEDERATION")
|
|
||||||
.map(|s| !s.is_empty())
|
|
||||||
.unwrap_or(false)
|
|
||||||
});
|
|
||||||
|
|
||||||
/// Recheck for new federation work every n seconds.
|
/// Recheck for new federation work every n seconds.
|
||||||
///
|
///
|
||||||
/// When the queue is processed faster than new activities are added and it reaches the current time
|
/// When the queue is processed faster than new activities are added and it reaches the current time
|
||||||
/// with an empty batch, this is the delay the queue waits before it checks if new activities have
|
/// with an empty batch, this is the delay the queue waits before it checks if new activities have
|
||||||
/// been added to the sent_activities table. This delay is only applied if no federated activity
|
/// been added to the sent_activities table. This delay is only applied if no federated activity
|
||||||
/// happens during sending activities of the last batch.
|
/// happens during sending activities of the last batch.
|
||||||
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
|
#[cfg(debug_assertions)]
|
||||||
if *LEMMY_TEST_FAST_FEDERATION {
|
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_millis(100);
|
||||||
Duration::from_millis(100)
|
#[cfg(not(debug_assertions))]
|
||||||
} else {
|
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30);
|
||||||
Duration::from_secs(30)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/// A task that will be run in an infinite loop, unless it is cancelled.
|
/// A task that will be run in an infinite loop, unless it is cancelled.
|
||||||
/// If the task exits without being cancelled, an error will be logged and the task will be
|
/// If the task exits without being cancelled, an error will be logged and the task will be
|
||||||
|
|
|
@ -40,7 +40,10 @@ use tracing::{debug, info, trace, warn};
|
||||||
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
|
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
|
||||||
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
||||||
/// SIGKILLed, less than X seconds of activities are resent)
|
/// SIGKILLed, less than X seconds of activities are resent)
|
||||||
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
|
#[cfg(debug_assertions)]
|
||||||
|
static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(1);
|
||||||
|
#[cfg(not(debug_assertions))]
|
||||||
|
static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(60);
|
||||||
|
|
||||||
pub(crate) struct InstanceWorker {
|
pub(crate) struct InstanceWorker {
|
||||||
instance: Instance,
|
instance: Instance,
|
||||||
|
@ -78,8 +81,6 @@ impl InstanceWorker {
|
||||||
/// cancelled (graceful exit)
|
/// cancelled (graceful exit)
|
||||||
pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> {
|
pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> {
|
||||||
debug!("Starting federation worker for {}", self.instance.domain);
|
debug!("Starting federation worker for {}", self.instance.domain);
|
||||||
let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
|
|
||||||
|
|
||||||
self.inboxes.update_communities(&self.context).await?;
|
self.inboxes.update_communities(&self.context).await?;
|
||||||
self.initial_fail_sleep().await?;
|
self.initial_fail_sleep().await?;
|
||||||
while !self.stop.is_cancelled() {
|
while !self.stop.is_cancelled() {
|
||||||
|
@ -87,7 +88,7 @@ impl InstanceWorker {
|
||||||
if self.stop.is_cancelled() {
|
if self.stop.is_cancelled() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (Utc::now() - self.last_state_insert) > save_state_every {
|
if (Utc::now() - self.last_state_insert) > SAVE_STATE_EVERY_TIME {
|
||||||
self.save_and_send_state().await?;
|
self.save_and_send_state().await?;
|
||||||
}
|
}
|
||||||
self.inboxes.update_communities(&self.context).await?;
|
self.inboxes.update_communities(&self.context).await?;
|
||||||
|
@ -135,7 +136,7 @@ impl InstanceWorker {
|
||||||
if id >= latest_id {
|
if id >= latest_id {
|
||||||
// no more work to be done, wait before rechecking
|
// no more work to be done, wait before rechecking
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
() = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
|
() = sleep(WORK_FINISHED_RECHECK_DELAY) => {},
|
||||||
() = self.stop.cancelled() => {}
|
() = self.stop.cancelled() => {}
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -179,6 +180,7 @@ impl InstanceWorker {
|
||||||
activity: &SentActivity,
|
activity: &SentActivity,
|
||||||
object: &SharedInboxActivities,
|
object: &SharedInboxActivities,
|
||||||
) -> LemmyResult<()> {
|
) -> LemmyResult<()> {
|
||||||
|
println!("send retry loop {:?}", activity.id);
|
||||||
let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?;
|
let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?;
|
||||||
if inbox_urls.is_empty() {
|
if inbox_urls.is_empty() {
|
||||||
trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
|
trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
|
||||||
|
@ -186,6 +188,7 @@ impl InstanceWorker {
|
||||||
self.state.last_successful_published_time = Some(activity.published);
|
self.state.last_successful_published_time = Some(activity.published);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
// TODO: make db column not null
|
||||||
let Some(actor_apub_id) = &activity.actor_apub_id else {
|
let Some(actor_apub_id) = &activity.actor_apub_id else {
|
||||||
return Ok(()); // activity was inserted before persistent queue was activated
|
return Ok(()); // activity was inserted before persistent queue was activated
|
||||||
};
|
};
|
||||||
|
@ -242,3 +245,146 @@ impl InstanceWorker {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[allow(clippy::unwrap_used)]
|
||||||
|
#[allow(clippy::indexing_slicing)]
|
||||||
|
mod test {
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use activitypub_federation::http_signatures::generate_actor_keypair;
|
||||||
|
use actix_web::{rt::System, web, App, HttpResponse, HttpServer};
|
||||||
|
use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url};
|
||||||
|
use lemmy_db_schema::{
|
||||||
|
newtypes::DbUrl,
|
||||||
|
source::{
|
||||||
|
activity::{ActorType, SentActivityForm},
|
||||||
|
person::{Person, PersonInsertForm},
|
||||||
|
},
|
||||||
|
traits::Crud,
|
||||||
|
};
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use serde_json::Value;
|
||||||
|
use serial_test::serial;
|
||||||
|
use std::{fs::File, io::BufReader};
|
||||||
|
use tokio::{
|
||||||
|
select,
|
||||||
|
spawn,
|
||||||
|
sync::mpsc::{error::TryRecvError, unbounded_channel},
|
||||||
|
};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial]
|
||||||
|
async fn test_worker() -> LemmyResult<()> {
|
||||||
|
let context = LemmyContext::init_test_context().await;
|
||||||
|
let instance = Instance::read_or_create(&mut context.pool(), "alpha.com".to_string()).await?;
|
||||||
|
|
||||||
|
let actor_keypair = generate_actor_keypair()?;
|
||||||
|
let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into();
|
||||||
|
let person_form = PersonInsertForm::builder()
|
||||||
|
.name("alice".to_string())
|
||||||
|
.actor_id(Some(actor_id.clone()))
|
||||||
|
.private_key(Some(actor_keypair.private_key))
|
||||||
|
.public_key(actor_keypair.public_key)
|
||||||
|
.inbox_url(Some(generate_inbox_url(&actor_id)?))
|
||||||
|
.shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?))
|
||||||
|
.instance_id(instance.id)
|
||||||
|
.build();
|
||||||
|
let person = Person::create(&mut context.pool(), &person_form).await?;
|
||||||
|
|
||||||
|
let cancel = CancellationToken::new();
|
||||||
|
let (stats_sender, mut stats_receiver) = unbounded_channel();
|
||||||
|
let (inbox_sender, mut inbox_receiver) = unbounded_channel();
|
||||||
|
|
||||||
|
// listen for received activities in background
|
||||||
|
let cancel_ = cancel.clone();
|
||||||
|
std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_)));
|
||||||
|
|
||||||
|
spawn(InstanceWorker::init_and_loop(
|
||||||
|
instance.clone(),
|
||||||
|
context.reset_request_count(),
|
||||||
|
cancel.clone(),
|
||||||
|
stats_sender,
|
||||||
|
));
|
||||||
|
// wait for startup before creating sent activity
|
||||||
|
sleep(WORK_FINISHED_RECHECK_DELAY).await;
|
||||||
|
|
||||||
|
// create outgoing activity
|
||||||
|
let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let form = SentActivityForm {
|
||||||
|
ap_id: Url::parse("http://local.com/activity/1")?.into(),
|
||||||
|
data: serde_json::from_reader(reader)?,
|
||||||
|
sensitive: false,
|
||||||
|
send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],
|
||||||
|
send_all_instances: false,
|
||||||
|
send_community_followers_of: None,
|
||||||
|
actor_type: ActorType::Person,
|
||||||
|
actor_apub_id: person.actor_id,
|
||||||
|
};
|
||||||
|
let sent = SentActivity::create(&mut context.pool(), form).await?;
|
||||||
|
|
||||||
|
sleep(WORK_FINISHED_RECHECK_DELAY).await;
|
||||||
|
|
||||||
|
// first receive at startup
|
||||||
|
let rcv = stats_receiver.recv().await.unwrap();
|
||||||
|
assert_eq!(instance.id, rcv.0);
|
||||||
|
assert_eq!(instance.id, rcv.1.instance_id);
|
||||||
|
assert_eq!(Some(ActivityId(0)), rcv.1.last_successful_id);
|
||||||
|
|
||||||
|
// receive for successfully sent activity
|
||||||
|
let inbox_rcv = inbox_receiver.recv().await.unwrap();
|
||||||
|
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
|
||||||
|
assert_eq!(&sent.data, parsed_activity.inner());
|
||||||
|
|
||||||
|
let rcv = stats_receiver.recv().await.unwrap();
|
||||||
|
assert_eq!(instance.id, rcv.0);
|
||||||
|
assert_eq!(instance.id, rcv.1.instance_id);
|
||||||
|
assert_eq!(Some(sent.id), rcv.1.last_successful_id);
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
cancel.cancel();
|
||||||
|
Instance::delete_all(&mut context.pool()).await?;
|
||||||
|
Person::delete(&mut context.pool(), person.id).await?;
|
||||||
|
|
||||||
|
// also receive state on shutdown
|
||||||
|
let rcv = stats_receiver.try_recv();
|
||||||
|
assert!(rcv.is_ok());
|
||||||
|
|
||||||
|
// nothing further received
|
||||||
|
let rcv = stats_receiver.try_recv();
|
||||||
|
assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
|
||||||
|
let inbox_rcv = inbox_receiver.try_recv();
|
||||||
|
assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_activities(
|
||||||
|
inbox_sender: UnboundedSender<String>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> LemmyResult<()> {
|
||||||
|
let run = HttpServer::new(move || {
|
||||||
|
App::new()
|
||||||
|
.app_data(actix_web::web::Data::new(inbox_sender.clone()))
|
||||||
|
.route(
|
||||||
|
"/inbox",
|
||||||
|
web::post().to(
|
||||||
|
|inbox_sender: actix_web::web::Data<UnboundedSender<String>>, body: String| async move {
|
||||||
|
inbox_sender.send(body.clone()).unwrap();
|
||||||
|
HttpResponse::new(StatusCode::OK)
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.bind(("127.0.0.1", 8085))?
|
||||||
|
.run();
|
||||||
|
select! {
|
||||||
|
_ = run => {},
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
use openssl::{pkey::PKey, rsa::Rsa};
|
|
||||||
use std::io::{Error, ErrorKind};
|
|
||||||
|
|
||||||
pub struct Keypair {
|
|
||||||
pub private_key: String,
|
|
||||||
pub public_key: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate the asymmetric keypair for ActivityPub HTTP signatures.
|
|
||||||
pub fn generate_actor_keypair() -> Result<Keypair, Error> {
|
|
||||||
let rsa = Rsa::generate(2048)?;
|
|
||||||
let pkey = PKey::from_rsa(rsa)?;
|
|
||||||
let public_key = pkey.public_key_to_pem()?;
|
|
||||||
let private_key = pkey.private_key_to_pem_pkcs8()?;
|
|
||||||
let key_to_string = |key| match String::from_utf8(key) {
|
|
||||||
Ok(s) => Ok(s),
|
|
||||||
Err(e) => Err(Error::new(
|
|
||||||
ErrorKind::Other,
|
|
||||||
format!("Failed converting key to string: {e}"),
|
|
||||||
)),
|
|
||||||
};
|
|
||||||
Ok(Keypair {
|
|
||||||
private_key: key_to_string(private_key)?,
|
|
||||||
public_key: key_to_string(public_key)?,
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -2,7 +2,6 @@ use cfg_if::cfg_if;
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(feature = "full")] {
|
if #[cfg(feature = "full")] {
|
||||||
pub mod apub;
|
|
||||||
pub mod cache_header;
|
pub mod cache_header;
|
||||||
pub mod email;
|
pub mod email;
|
||||||
pub mod rate_limit;
|
pub mod rate_limit;
|
||||||
|
|
Loading…
Reference in a new issue