diff --git a/server/Cargo.lock b/server/Cargo.lock index 9781adaf1..c9f0a3b15 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -490,6 +490,56 @@ dependencies = [ "serde_urlencoded", ] +[[package]] +name = "background-jobs" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7" +dependencies = [ + "background-jobs-actix", + "background-jobs-core", +] + +[[package]] +name = "background-jobs-actix" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f" +dependencies = [ + "actix-rt", + "anyhow", + "async-trait", + "background-jobs-core", + "chrono", + "log", + "num_cpus", + "rand 0.7.3", + "serde 1.0.114", + "serde_json", + "thiserror", + "tokio", + "uuid 0.8.1", +] + +[[package]] +name = "background-jobs-core" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7" +dependencies = [ + "actix-rt", + "anyhow", + "async-trait", + "chrono", + "futures", + "log", + "serde 1.0.114", + "serde_json", + "thiserror", + "tokio", + "uuid 0.8.1", +] + [[package]] name = "backtrace" version = "0.3.50" @@ -1723,6 +1773,7 @@ dependencies = [ "anyhow", "async-trait", "awc", + "background-jobs", "base64 0.12.3", "bcrypt", "captcha", diff --git a/server/Cargo.toml b/server/Cargo.toml index dba0fee6f..f8adb8203 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -53,3 +53,4 @@ async-trait = "0.1.36" captcha = "0.0.7" anyhow = "1.0.32" thiserror = "1.0.20" +background-jobs = " 0.8.0-alpha.2" \ No newline at end of file diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs index 341ee2f36..b4d6c4d2b 100644 --- a/server/src/apub/activities.rs +++ b/server/src/apub/activities.rs @@ -1,5 +1,5 @@ use crate::{ - apub::{activity_sender::send_activity, community::do_announce, insert_activity}, + apub::{activity_queue::send_activity, community::do_announce, insert_activity}, LemmyContext, LemmyError, }; @@ -32,7 +32,7 @@ where if community.local { do_announce(activity.into_any_base()?, &community, creator, context).await?; } else { - send_activity(context.activity_sender(), activity, creator, to)?; + send_activity(context.activity_queue(), activity, creator, to)?; } Ok(()) diff --git a/server/src/apub/activity_queue.rs b/server/src/apub/activity_queue.rs new file mode 100644 index 000000000..bf3817a9f --- /dev/null +++ b/server/src/apub/activity_queue.rs @@ -0,0 +1,120 @@ +use crate::{ + apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, + LemmyError, +}; +use activitystreams::{ + base::{Extends, ExtendsExt}, + object::AsObject, +}; +use actix::prelude::*; +use anyhow::Context; +use awc::Client; +use lemmy_utils::{location_info, settings::Settings}; +use log::{debug, warn}; +use serde::Serialize; +use url::Url; +use background_jobs::{Backoff, MaxRetries, WorkerConfig, QueueHandle, Job, create_server}; +use background_jobs::memory_storage::Storage; +use serde::Deserialize; +use anyhow::Error; +use futures::future::{Ready, ok}; +use std::pin::Pin; +use std::future::Future; + +pub fn send_activity( + activity_sender: &QueueHandle, + activity: T, + actor: &dyn ActorType, + to: Vec, +) -> Result<(), LemmyError> +where + T: AsObject, + T: Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + if !Settings::get().federation.enabled { + return Ok(()); + } + + let activity = activity.into_any_base()?; + let serialised_activity = serde_json::to_string(&activity)?; + + for to_url in &to { + check_is_apub_id_valid(&to_url)?; + } + + // TODO: it would make sense to create a separate task for each destination server + let message = SendActivityTask { + activity: serialised_activity, + to, + actor_id: actor.actor_id()?, + private_key: actor.private_key().context(location_info!())?, + }; + activity_sender.queue::(message)?; + + Ok(()) +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct SendActivityTask { + activity: String, + to: Vec, + actor_id: Url, + private_key: String, +} + +impl Job for SendActivityTask { + type State = (); + type Future = Pin>>>; + const NAME: &'static str = "SendActivityTask"; + + const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); + const BACKOFF: Backoff = Backoff::Exponential(2); + + fn run(self, _: Self::State) -> Self::Future { + Box::pin(async move { + for to_url in &self.to { + + // TODO: should pass this in somehow instead of creating a new client every time + // i suppose this can be done through a state + let client = Client::default(); + + let request = client + .post(to_url.as_str()) + .header("Content-Type", "application/json"); + + // TODO: i believe we have to do the signing in here because it is only valid for a few seconds + let signed = sign( + request, + self.activity.clone(), + &self.actor_id, + self.private_key.to_owned(), + ) + .await?; + signed.send().await?; + } + + Ok(()) + }) + } +} + +pub fn create_activity_queue() -> QueueHandle { + + // Start the application server. This guards access to to the jobs store + let queue_handle = create_server(Storage::new()); + + // Configure and start our workers + WorkerConfig::new(||{}) + .register::() + .start(queue_handle.clone()); + + // Queue our jobs + //queue_handle.queue::(MyJob::new(1, 2))?; + //queue_handle.queue::(MyJob::new(3, 4))?; + //queue_handle.queue::(MyJob::new(5, 6))?; + + // Block on Actix + queue_handle +} diff --git a/server/src/apub/activity_sender.rs b/server/src/apub/activity_sender.rs deleted file mode 100644 index a6a3c2b57..000000000 --- a/server/src/apub/activity_sender.rs +++ /dev/null @@ -1,122 +0,0 @@ -use crate::{ - apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, - LemmyError, -}; -use activitystreams::{ - base::{Extends, ExtendsExt}, - object::AsObject, -}; -use actix::prelude::*; -use anyhow::Context; -use awc::Client; -use lemmy_utils::{location_info, settings::Settings}; -use log::{debug, warn}; -use serde::Serialize; -use url::Url; - -pub fn send_activity( - activity_sender: &Addr, - activity: T, - actor: &dyn ActorType, - to: Vec, -) -> Result<(), LemmyError> -where - T: AsObject, - T: Extends, - Kind: Serialize, - >::Error: From + Send + Sync + 'static, -{ - if !Settings::get().federation.enabled { - return Ok(()); - } - - let activity = activity.into_any_base()?; - let serialised_activity = serde_json::to_string(&activity)?; - - for to_url in &to { - check_is_apub_id_valid(&to_url)?; - } - - let message = SendActivity { - activity: serialised_activity, - to, - actor_id: actor.actor_id()?, - private_key: actor.private_key().context(location_info!())?, - }; - activity_sender.do_send(message); - - Ok(()) -} - -#[derive(Message)] -#[rtype(result = "()")] -struct SendActivity { - activity: String, - to: Vec, - actor_id: Url, - private_key: String, -} - -pub struct ActivitySender { - client: Client, -} - -impl ActivitySender { - pub fn startup(client: Client) -> ActivitySender { - ActivitySender { client } - } -} - -impl Actor for ActivitySender { - type Context = actix::Context; -} - -impl Handler for ActivitySender { - type Result = (); - - fn handle(&mut self, msg: SendActivity, _ctx: &mut actix::Context) -> Self::Result { - debug!( - "Sending activitypub activity {} to {:?}", - &msg.activity, &msg.to - ); - - Box::pin(async move { - for to_url in &msg.to { - let request = self - .client - .post(to_url.as_str()) - .header("Content-Type", "application/json"); - - let signed = sign( - request, - msg.activity.clone(), - &msg.actor_id, - msg.private_key.to_owned(), - ) - .await; - - let signed = match signed { - Ok(s) => s, - Err(e) => { - warn!( - "Failed to sign activity {} from {}: {}", - &msg.activity, &msg.actor_id, e - ); - return; - } - }; - - // TODO: if the sending fails, it should retry with exponential backoff - match signed.send().await { - Ok(_) => {} - Err(e) => { - warn!( - "Failed to send activity {} to {}: {}", - &msg.activity, &to_url, e - ); - } - } - } - }); - } -} diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index 551d042c3..084a2b30c 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -2,7 +2,7 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ activities::generate_activity_id, - activity_sender::send_activity, + activity_queue::send_activity, check_actor_domain, create_apub_response, create_apub_tombstone_response, @@ -156,7 +156,7 @@ impl ActorType for Community { insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), accept, self, vec![to])?; + send_activity(context.activity_queue(), accept, self, vec![to])?; Ok(()) } @@ -177,7 +177,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_sender(), delete, creator, inboxes)?; + send_activity(context.activity_queue(), delete, creator, inboxes)?; Ok(()) } @@ -209,7 +209,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_sender(), undo, creator, inboxes)?; + send_activity(context.activity_queue(), undo, creator, inboxes)?; Ok(()) } @@ -230,7 +230,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_sender(), remove, mod_, inboxes)?; + send_activity(context.activity_queue(), remove, mod_, inboxes)?; Ok(()) } @@ -259,7 +259,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for remove , the creator is the actor, and does the signing - send_activity(context.activity_sender(), undo, mod_, inboxes)?; + send_activity(context.activity_queue(), undo, mod_, inboxes)?; Ok(()) } @@ -512,7 +512,7 @@ pub async fn do_announce( let community_shared_inbox = community.get_shared_inbox_url()?; to.retain(|x| x != &community_shared_inbox); - send_activity(context.activity_sender(), announce, community, to)?; + send_activity(context.activity_queue(), announce, community, to)?; Ok(()) } diff --git a/server/src/apub/mod.rs b/server/src/apub/mod.rs index ab0ac4f25..420591664 100644 --- a/server/src/apub/mod.rs +++ b/server/src/apub/mod.rs @@ -1,5 +1,5 @@ pub mod activities; -pub mod activity_sender; +pub mod activity_queue; pub mod comment; pub mod community; pub mod extensions; diff --git a/server/src/apub/private_message.rs b/server/src/apub/private_message.rs index 54e89112e..c695e7dcb 100644 --- a/server/src/apub/private_message.rs +++ b/server/src/apub/private_message.rs @@ -1,7 +1,7 @@ use crate::{ apub::{ activities::generate_activity_id, - activity_sender::send_activity, + activity_queue::send_activity, check_actor_domain, check_is_apub_id_valid, create_tombstone, @@ -135,7 +135,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, create.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), create, creator, vec![to])?; + send_activity(context.activity_queue(), create, creator, vec![to])?; Ok(()) } @@ -155,7 +155,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, update.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), update, creator, vec![to])?; + send_activity(context.activity_queue(), update, creator, vec![to])?; Ok(()) } @@ -174,7 +174,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, delete.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), delete, creator, vec![to])?; + send_activity(context.activity_queue(), delete, creator, vec![to])?; Ok(()) } @@ -204,7 +204,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, undo.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), undo, creator, vec![to])?; + send_activity(context.activity_queue(), undo, creator, vec![to])?; Ok(()) } diff --git a/server/src/apub/user.rs b/server/src/apub/user.rs index 16771e963..492cbb615 100644 --- a/server/src/apub/user.rs +++ b/server/src/apub/user.rs @@ -2,7 +2,7 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ activities::generate_activity_id, - activity_sender::send_activity, + activity_queue::send_activity, check_actor_domain, create_apub_response, fetcher::get_or_fetch_and_upsert_actor, @@ -128,7 +128,7 @@ impl ActorType for User_ { insert_activity(self.id, follow.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), follow, self, vec![to])?; + send_activity(context.activity_queue(), follow, self, vec![to])?; Ok(()) } @@ -153,7 +153,7 @@ impl ActorType for User_ { insert_activity(self.id, undo.clone(), true, context.pool()).await?; - send_activity(context.activity_sender(), undo, self, vec![to])?; + send_activity(context.activity_queue(), undo, self, vec![to])?; Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index df0e2123d..e0e641c90 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -34,7 +34,6 @@ use crate::{ websocket::server::ChatServer, }; -use crate::apub::activity_sender::ActivitySender; use actix::Addr; use actix_web::{client::Client, dev::ConnectionInfo}; use anyhow::anyhow; @@ -43,6 +42,7 @@ use log::error; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use serde::Deserialize; use std::process::Command; +use background_jobs::QueueHandle; pub type DbPool = diesel::r2d2::Pool>; pub type ConnectionId = usize; @@ -77,7 +77,7 @@ pub struct LemmyContext { pub pool: DbPool, pub chat_server: Addr, pub client: Client, - pub activity_sender: Addr, + pub activity_queue: QueueHandle, } impl LemmyContext { @@ -85,13 +85,13 @@ impl LemmyContext { pool: DbPool, chat_server: Addr, client: Client, - activity_sender: Addr, + activity_queue: QueueHandle, ) -> LemmyContext { LemmyContext { pool, chat_server, client, - activity_sender, + activity_queue, } } pub fn pool(&self) -> &DbPool { @@ -103,8 +103,8 @@ impl LemmyContext { pub fn client(&self) -> &Client { &self.client } - pub fn activity_sender(&self) -> &Addr { - &self.activity_sender + pub fn activity_queue(&self) -> &QueueHandle { + &self.activity_queue } } @@ -114,7 +114,7 @@ impl Clone for LemmyContext { pool: self.pool.clone(), chat_server: self.chat_server.clone(), client: self.client.clone(), - activity_sender: self.activity_sender.clone(), + activity_queue: self.activity_queue.clone(), } } } diff --git a/server/src/main.rs b/server/src/main.rs index 1ecfa3124..3e77fb61f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,7 +20,6 @@ use diesel::{ }; use lemmy_db::get_database_url_from_env; use lemmy_server::{ - apub::activity_sender::ActivitySender, blocking, code_migrations::run_advanced_migrations, rate_limit::{rate_limiter::RateLimiter, RateLimit}, @@ -32,6 +31,7 @@ use lemmy_server::{ use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX}; use std::sync::Arc; use tokio::sync::Mutex; +use lemmy_server::apub::activity_queue::create_activity_queue; lazy_static! { // static ref CACHE_CONTROL_VALUE: String = format!("public, max-age={}", 365 * 24 * 60 * 60); @@ -77,19 +77,19 @@ async fn main() -> Result<(), LemmyError> { // Create Http server with websocket support HttpServer::new(move || { - let activity_sender = ActivitySender::startup(Client::default()).start(); + let activity_queue = create_activity_queue(); let chat_server = ChatServer::startup( pool.clone(), rate_limiter.clone(), Client::default(), - activity_sender.clone(), + activity_queue.clone(), ) .start(); let context = LemmyContext::create( pool.clone(), chat_server, Client::default(), - activity_sender, + activity_queue ); let settings = Settings::get(); let rate_limiter = rate_limiter.clone(); diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index b4293c9ca..a20681fdf 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -5,7 +5,6 @@ use super::*; use crate::{ api::{comment::*, community::*, post::*, site::*, user::*, *}, - apub::activity_sender::ActivitySender, rate_limit::RateLimit, websocket::UserOperation, CommunityId, @@ -20,6 +19,7 @@ use actix_web::{client::Client, web}; use anyhow::Context as acontext; use lemmy_db::naive_now; use lemmy_utils::location_info; +use background_jobs::QueueHandle; /// Chat server sends this messages to session #[derive(Message)] @@ -183,7 +183,7 @@ pub struct ChatServer { /// An HTTP Client client: Client, - activity_sender: Addr, + activity_queue: QueueHandle, } impl ChatServer { @@ -191,7 +191,7 @@ impl ChatServer { pool: Pool>, rate_limiter: RateLimit, client: Client, - activity_sender: Addr, + activity_queue: QueueHandle, ) -> ChatServer { ChatServer { sessions: HashMap::new(), @@ -203,7 +203,7 @@ impl ChatServer { rate_limiter, captchas: Vec::new(), client, - activity_sender, + activity_queue, } } @@ -460,7 +460,7 @@ impl ChatServer { }; let client = self.client.clone(); - let activity_sender = self.activity_sender.clone(); + let activity_queue = self.activity_queue.clone(); async move { let msg = msg; let json: Value = serde_json::from_str(&msg.msg)?; @@ -475,7 +475,7 @@ impl ChatServer { pool, chat_server: addr, client, - activity_sender, + activity_queue, }; let args = Args { context: &context,