diff --git a/Cargo.lock b/Cargo.lock index 9590e8f139..9d575f5782 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2543,7 +2543,6 @@ dependencies = [ name = "lemmy_api_common" version = "0.18.0" dependencies = [ - "actix-rt", "actix-web", "anyhow", "chrono", @@ -2561,6 +2560,7 @@ dependencies = [ "rosetta-i18n", "serde", "serde_with", + "tokio", "tracing", "ts-rs", "url", @@ -2592,7 +2592,6 @@ name = "lemmy_apub" version = "0.18.0" dependencies = [ "activitypub_federation", - "actix-rt", "actix-web", "anyhow", "assert-json-diff", @@ -2620,6 +2619,7 @@ dependencies = [ "sha2", "strum_macros", "task-local-extensions", + "tokio", "tracing", "url", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 430deb082c..07e41ab3b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,7 @@ anyhow = "1.0.71" diesel_ltree = "0.3.0" typed-builder = "0.10.0" serial_test = "0.9.0" -tokio = "1.28.2" +tokio = { version = "1.28.2", features = ["full"] } sha2 = "0.10.6" regex = "1.8.4" once_cell = "1.18.0" diff --git a/config/defaults.hjson b/config/defaults.hjson index 4c38ddd455..6032f8fc9a 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -76,4 +76,8 @@ port: 8536 # Whether the site is available over TLS. Needs to be true for federation to work. tls_enabled: true + # The number of activitypub federation workers that can be in-flight concurrently + worker_count: 0 + # The number of activitypub federation retry workers that can be in-flight concurrently + retry_count: 0 } diff --git a/crates/api_common/Cargo.toml b/crates/api_common/Cargo.toml index 46045d8057..339d233a16 100644 --- a/crates/api_common/Cargo.toml +++ b/crates/api_common/Cargo.toml @@ -38,7 +38,7 @@ encoding = { version = "0.2.33", optional = true } anyhow = { workspace = true } futures = { workspace = true } uuid = { workspace = true } -actix-rt = { workspace = true } +tokio = { workspace = true } reqwest = { workspace = true } ts-rs = { workspace = true, optional = true } actix-web = { workspace = true } diff --git a/crates/api_common/src/request.rs b/crates/api_common/src/request.rs index c6f71b868a..3139193a6a 100644 --- a/crates/api_common/src/request.rs +++ b/crates/api_common/src/request.rs @@ -271,7 +271,7 @@ mod tests { use url::Url; // These helped with testing - #[actix_rt::test] + #[tokio::test] async fn test_site_metadata() { let settings = &SETTINGS.clone(); let client = reqwest::Client::builder() diff --git a/crates/api_common/src/site.rs b/crates/api_common/src/site.rs index 4d488ec1b8..865acc0dc2 100644 --- a/crates/api_common/src/site.rs +++ b/crates/api_common/src/site.rs @@ -177,7 +177,6 @@ pub struct CreateSite { pub rate_limit_search_per_second: Option, pub federation_enabled: Option, pub federation_debug: Option, - pub federation_worker_count: Option, pub captcha_enabled: Option, pub captcha_difficulty: Option, pub allowed_instances: Option>, @@ -250,8 +249,6 @@ pub struct EditSite { pub federation_enabled: Option, /// Enables federation debugging. pub federation_debug: Option, - /// The number of federation workers. - pub federation_worker_count: Option, /// Whether to enable captchas for signups. pub captcha_enabled: Option, /// The captcha difficulty. Can be easy, medium, or hard diff --git a/crates/api_crud/src/site/create.rs b/crates/api_crud/src/site/create.rs index 2a51309a42..a1669baef0 100644 --- a/crates/api_crud/src/site/create.rs +++ b/crates/api_crud/src/site/create.rs @@ -122,7 +122,6 @@ impl PerformCrud for CreateSite { .slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex)) .actor_name_max_length(data.actor_name_max_length) .federation_enabled(data.federation_enabled) - .federation_worker_count(data.federation_worker_count) .captcha_enabled(data.captcha_enabled) .captcha_difficulty(data.captcha_difficulty.clone()) .build(); diff --git a/crates/api_crud/src/site/update.rs b/crates/api_crud/src/site/update.rs index fadde0a0bb..6664d549a4 100644 --- a/crates/api_crud/src/site/update.rs +++ b/crates/api_crud/src/site/update.rs @@ -123,7 +123,6 @@ impl PerformCrud for EditSite { .slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex)) .actor_name_max_length(data.actor_name_max_length) .federation_enabled(data.federation_enabled) - .federation_worker_count(data.federation_worker_count) .captcha_enabled(data.captcha_enabled) .captcha_difficulty(data.captcha_difficulty.clone()) .reports_email_admins(data.reports_email_admins) diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index 2007b541a4..8570541f74 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -25,7 +25,7 @@ chrono = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } actix-web = { workspace = true } -actix-rt = { workspace = true } +tokio = {workspace = true} tracing = { workspace = true } strum_macros = { workspace = true } url = { workspace = true } diff --git a/crates/apub/src/collections/community_moderators.rs b/crates/apub/src/collections/community_moderators.rs index c439da710d..d53f86280d 100644 --- a/crates/apub/src/collections/community_moderators.rs +++ b/crates/apub/src/collections/community_moderators.rs @@ -120,7 +120,7 @@ mod tests { }; use serial_test::serial; - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_community_moderators() { let context = init_context().await; diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index e2a03b8b3f..16cb1542b9 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -223,7 +223,7 @@ pub(crate) mod tests { LocalSite::delete(context.pool()).await.unwrap(); } - #[actix_rt::test] + #[tokio::test] #[serial] pub(crate) async fn test_parse_lemmy_comment() { let context = init_context().await; @@ -249,7 +249,7 @@ pub(crate) mod tests { cleanup(data, &context).await; } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_pleroma_comment() { let context = init_context().await; @@ -279,7 +279,7 @@ pub(crate) mod tests { cleanup(data, &context).await; } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_html_to_markdown_sanitize() { let parsed = parse_html("hello"); diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 6526d2d26e..888a7f4581 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -242,7 +242,7 @@ pub(crate) mod tests { community } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_community() { let context = init_context().await; diff --git a/crates/apub/src/objects/instance.rs b/crates/apub/src/objects/instance.rs index 72d133441b..6cd27fbbda 100644 --- a/crates/apub/src/objects/instance.rs +++ b/crates/apub/src/objects/instance.rs @@ -221,7 +221,7 @@ pub(crate) mod tests { site } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_instance() { let context = init_context().await; diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index c71d46ccff..3eeb733fd7 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -223,7 +223,7 @@ pub(crate) mod tests { (person, site) } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_person() { let context = init_context().await; @@ -236,7 +236,7 @@ pub(crate) mod tests { cleanup((person, site), &context).await; } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_pleroma_person() { let context = init_context().await; diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index b255ffb9b6..4ef9351ab4 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -281,7 +281,7 @@ mod tests { use lemmy_db_schema::source::site::Site; use serial_test::serial; - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_post() { let context = init_context().await; diff --git a/crates/apub/src/objects/private_message.rs b/crates/apub/src/objects/private_message.rs index 01f576ff8f..ae2637c58e 100644 --- a/crates/apub/src/objects/private_message.rs +++ b/crates/apub/src/objects/private_message.rs @@ -187,7 +187,7 @@ mod tests { Site::delete(context.pool(), data.2.id).await.unwrap(); } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_lemmy_pm() { let context = init_context().await; @@ -213,7 +213,7 @@ mod tests { cleanup(data, &context).await; } - #[actix_rt::test] + #[tokio::test] #[serial] async fn test_parse_pleroma_pm() { let context = init_context().await; diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index ac4ddc47a4..6714913f46 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -339,7 +339,6 @@ diesel::table! { slur_filter_regex -> Nullable, actor_name_max_length -> Int4, federation_enabled -> Bool, - federation_worker_count -> Int4, captcha_enabled -> Bool, #[max_length = 255] captcha_difficulty -> Varchar, diff --git a/crates/db_schema/src/source/local_site.rs b/crates/db_schema/src/source/local_site.rs index e65a615358..be93717a98 100644 --- a/crates/db_schema/src/source/local_site.rs +++ b/crates/db_schema/src/source/local_site.rs @@ -50,8 +50,6 @@ pub struct LocalSite { pub actor_name_max_length: i32, /// Whether federation is enabled. pub federation_enabled: bool, - /// The number of concurrent federation http workers. - pub federation_worker_count: i32, /// Whether captcha is enabled. pub captcha_enabled: bool, /// The captcha difficulty. @@ -85,7 +83,6 @@ pub struct LocalSiteInsertForm { pub slur_filter_regex: Option, pub actor_name_max_length: Option, pub federation_enabled: Option, - pub federation_worker_count: Option, pub captcha_enabled: Option, pub captcha_difficulty: Option, pub registration_mode: Option, @@ -112,7 +109,6 @@ pub struct LocalSiteUpdateForm { pub slur_filter_regex: Option>, pub actor_name_max_length: Option, pub federation_enabled: Option, - pub federation_worker_count: Option, pub captcha_enabled: Option, pub captcha_difficulty: Option, pub registration_mode: Option, diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 6e200b224d..5d0e642f6a 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -39,6 +39,12 @@ pub struct Settings { #[default(None)] #[doku(skip)] pub opentelemetry_url: Option, + /// The number of activitypub federation workers that can be in-flight concurrently + #[default(0)] + pub worker_count: usize, + /// The number of activitypub federation retry workers that can be in-flight concurrently + #[default(0)] + pub retry_count: usize, } #[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] diff --git a/migrations/2023-06-19-055530_add_retry_worker_setting/down.sql b/migrations/2023-06-19-055530_add_retry_worker_setting/down.sql new file mode 100644 index 0000000000..e3c200a15b --- /dev/null +++ b/migrations/2023-06-19-055530_add_retry_worker_setting/down.sql @@ -0,0 +1 @@ +alter table local_site add column federation_worker_count int default 64 not null; \ No newline at end of file diff --git a/migrations/2023-06-19-055530_add_retry_worker_setting/up.sql b/migrations/2023-06-19-055530_add_retry_worker_setting/up.sql new file mode 100644 index 0000000000..2aac86f855 --- /dev/null +++ b/migrations/2023-06-19-055530_add_retry_worker_setting/up.sql @@ -0,0 +1 @@ +alter table local_site drop column federation_worker_count; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 86cf400b64..d919acc057 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,24 +139,23 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { }); } + let settings_bind = settings.clone(); + let federation_config = FederationConfig::builder() .domain(settings.hostname.clone()) .app_data(context.clone()) .client(client.clone()) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .worker_count(local_site.federation_worker_count as usize) + .worker_count(settings.worker_count) + .retry_count(settings.retry_count) .debug(cfg!(debug_assertions)) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.pool().clone()))) .build() - .await - .expect("configure federation"); + .await?; // Create Http server with websocket support - let settings_bind = settings.clone(); HttpServer::new(move || { - let context = context.clone(); - let cors_config = if cfg!(debug_assertions) { Cors::permissive() } else { @@ -173,7 +172,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { )) .wrap(cors_config) .wrap(TracingLogger::::new()) - .app_data(Data::new(context)) + .app_data(Data::new(context.clone())) .app_data(Data::new(rate_limit_cell.clone())) .wrap(FederationMiddleware::new(federation_config.clone())) // The routes diff --git a/src/main.rs b/src/main.rs index 315fe84be7..5fc03ed025 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use lemmy_server::{init_logging, start_lemmy_server}; use lemmy_utils::{error::LemmyError, settings::SETTINGS}; -#[actix_web::main] +#[tokio::main] pub async fn main() -> Result<(), LemmyError> { init_logging(&SETTINGS.opentelemetry_url)?; #[cfg(not(feature = "embed-pictrs"))]