Make activity queue worker count configurable, log stats (#2113)

This commit is contained in:
Nutomic 2022-03-03 18:54:33 +00:00 committed by GitHub
parent fa29ffade1
commit ef1e164cc5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 32 additions and 41 deletions

View file

@ -59,6 +59,8 @@
# use allowlist only for remote communities, and posts/comments in local communities # use allowlist only for remote communities, and posts/comments in local communities
# (meaning remote communities will show content from arbitrary instances). # (meaning remote communities will show content from arbitrary instances).
strict_allowlist: true strict_allowlist: true
# Number of workers for sending outgoing activities.
worker_count: 16
} }
captcha: { captcha: {
# Whether captcha is required for signup # Whether captcha is required for signup

View file

@ -146,7 +146,6 @@ mod tests {
}, },
protocol::tests::file_to_json_object, protocol::tests::file_to_json_object,
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{ use lemmy_db_schema::{
source::{ source::{
community::Community, community::Community,
@ -160,9 +159,7 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_community_moderators() { async fn test_parse_lemmy_community_moderators() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let (new_mod, site) = parse_lemmy_person(&context).await; let (new_mod, site) = parse_lemmy_person(&context).await;
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;
let community_id = community.id; let community_id = community.id;

View file

@ -218,7 +218,6 @@ pub(crate) mod tests {
protocol::tests::file_to_json_object, protocol::tests::file_to_json_object,
}; };
use assert_json_diff::assert_json_include; use assert_json_diff::assert_json_include;
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site; use lemmy_db_schema::source::site::Site;
use serial_test::serial; use serial_test::serial;
@ -248,9 +247,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
pub(crate) async fn test_parse_lemmy_comment() { pub(crate) async fn test_parse_lemmy_comment() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
@ -279,9 +276,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_comment() { async fn test_parse_pleroma_comment() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;

View file

@ -217,7 +217,6 @@ pub(crate) mod tests {
objects::{instance::tests::parse_lemmy_instance, tests::init_context}, objects::{instance::tests::parse_lemmy_instance, tests::init_context},
protocol::tests::file_to_json_object, protocol::tests::file_to_json_object,
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{source::site::Site, traits::Crud}; use lemmy_db_schema::{source::site::Site, traits::Crud};
use serial_test::serial; use serial_test::serial;
@ -244,9 +243,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_community() { async fn test_parse_lemmy_community() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let site = parse_lemmy_instance(&context).await; let site = parse_lemmy_instance(&context).await;
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;

View file

@ -186,7 +186,6 @@ pub(in crate::objects) async fn fetch_instance_actor_for_object(
pub(crate) mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::{objects::tests::init_context, protocol::tests::file_to_json_object}; use crate::{objects::tests::init_context, protocol::tests::file_to_json_object};
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud; use lemmy_db_schema::traits::Crud;
use serial_test::serial; use serial_test::serial;
@ -207,9 +206,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_instance() { async fn test_parse_lemmy_instance() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let site = parse_lemmy_instance(&context).await; let site = parse_lemmy_instance(&context).await;
assert_eq!(site.name, "Enterprise"); assert_eq!(site.name, "Enterprise");

View file

@ -22,11 +22,11 @@ pub(crate) fn get_summary_from_string_or_source(
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use actix::Actor; use actix::Actor;
use background_jobs::QueueHandle;
use diesel::{ use diesel::{
r2d2::{ConnectionManager, Pool}, r2d2::{ConnectionManager, Pool},
PgConnection, PgConnection,
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{ use lemmy_db_schema::{
establish_unpooled_connection, establish_unpooled_connection,
get_database_url_from_env, get_database_url_from_env,
@ -46,7 +46,11 @@ pub(crate) mod tests {
// TODO: would be nice if we didnt have to use a full context for tests. // TODO: would be nice if we didnt have to use a full context for tests.
// or at least write a helper function so this code is shared with main.rs // or at least write a helper function so this code is shared with main.rs
pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext { pub(crate) fn init_context() -> LemmyContext {
let client = reqwest::Client::new().into();
// activity queue isnt used in tests, so worker count makes no difference
let queue_manager = create_activity_queue(client, 4);
let activity_queue = queue_manager.queue_handle().clone();
// call this to run migrations // call this to run migrations
establish_unpooled_connection(); establish_unpooled_connection();
let settings = Settings::init().unwrap(); let settings = Settings::init().unwrap();

View file

@ -207,7 +207,6 @@ pub(crate) mod tests {
}, },
protocol::{objects::instance::Instance, tests::file_to_json_object}, protocol::{objects::instance::Instance, tests::file_to_json_object},
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{source::site::Site, traits::Crud}; use lemmy_db_schema::{source::site::Site, traits::Crud};
use serial_test::serial; use serial_test::serial;
@ -229,9 +228,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_person() { async fn test_parse_lemmy_person() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let (person, site) = parse_lemmy_person(&context).await; let (person, site) = parse_lemmy_person(&context).await;
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string())); assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
@ -245,9 +242,7 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_person() { async fn test_parse_pleroma_person() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
// create and parse a fake pleroma instance actor, to avoid network request during test // create and parse a fake pleroma instance actor, to avoid network request during test
let mut json: Instance = file_to_json_object("assets/lemmy/objects/instance.json").unwrap(); let mut json: Instance = file_to_json_object("assets/lemmy/objects/instance.json").unwrap();

View file

@ -215,16 +215,13 @@ mod tests {
}, },
protocol::tests::file_to_json_object, protocol::tests::file_to_json_object,
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site; use lemmy_db_schema::source::site::Site;
use serial_test::serial; use serial_test::serial;
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_post() { async fn test_parse_lemmy_post() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let (person, site) = parse_lemmy_person(&context).await; let (person, site) = parse_lemmy_person(&context).await;
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;

View file

@ -167,7 +167,6 @@ mod tests {
protocol::tests::file_to_json_object, protocol::tests::file_to_json_object,
}; };
use assert_json_diff::assert_json_include; use assert_json_diff::assert_json_include;
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site; use lemmy_db_schema::source::site::Site;
use serial_test::serial; use serial_test::serial;
@ -203,9 +202,7 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_pm() { async fn test_parse_lemmy_pm() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json").unwrap(); let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json").unwrap();
@ -232,9 +229,7 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_pm() { async fn test_parse_pleroma_pm() {
let client = reqwest::Client::new().into(); let context = init_context();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap(); let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();

View file

@ -42,6 +42,14 @@ pub async fn send_activity(
} }
} else { } else {
activity_queue.queue::<SendActivityTask>(message).await?; activity_queue.queue::<SendActivityTask>(message).await?;
let stats = activity_queue.get_stats().await?;
info!(
"Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}",
stats.pending,
stats.running,
stats.dead.this_hour(),
stats.complete.this_hour()
);
} }
} }
@ -110,12 +118,13 @@ async fn do_send(task: SendActivityTask, client: &ClientWithMiddleware) -> Resul
r r
} }
pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager { pub fn create_activity_queue(client: ClientWithMiddleware, worker_count: u64) -> Manager {
// Configure and start our workers // Configure and start our workers
WorkerConfig::new_managed(Storage::new(), move |_| MyState { WorkerConfig::new_managed(Storage::new(), move |_| MyState {
client: client.clone(), client: client.clone(),
}) })
.register::<SendActivityTask>() .register::<SendActivityTask>()
.set_worker_count("default", worker_count)
.start() .start()
} }

View file

@ -130,6 +130,9 @@ pub struct FederationConfig {
/// (meaning remote communities will show content from arbitrary instances). /// (meaning remote communities will show content from arbitrary instances).
#[default(true)] #[default(true)]
pub strict_allowlist: bool, pub strict_allowlist: bool,
/// Number of workers for sending outgoing activities.
#[default(16)]
pub worker_count: u64,
} }
#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] #[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]

View file

@ -101,7 +101,7 @@ async fn main() -> Result<(), LemmyError> {
let client = ClientBuilder::new(client).with(TracingMiddleware).build(); let client = ClientBuilder::new(client).with(TracingMiddleware).build();
let queue_manager = create_activity_queue(client.clone()); let queue_manager = create_activity_queue(client.clone(), settings.federation.worker_count);
let activity_queue = queue_manager.queue_handle().clone(); let activity_queue = queue_manager.queue_handle().clone();