From 35649032c0ed0fd69b7c1913cc83d24a558daa31 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Fri, 14 Aug 2020 15:30:12 +0200 Subject: [PATCH] WIP: implement ActivitySender actor --- server/src/apub/activities.rs | 53 +++-------------- server/src/apub/activity_sender.rs | 95 ++++++++++++++++++++++++++++++ server/src/apub/community.rs | 45 +++++++++++--- server/src/apub/mod.rs | 1 + server/src/apub/private_message.rs | 49 +++++++-------- server/src/apub/user.rs | 17 +++++- server/src/lib.rs | 15 ++++- server/src/main.rs | 18 +++++- server/src/request.rs | 2 +- server/src/websocket/server.rs | 7 +++ 10 files changed, 218 insertions(+), 84 deletions(-) create mode 100644 server/src/apub/activity_sender.rs diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs index 4700bb089..c8f040a95 100644 --- a/server/src/apub/activities.rs +++ b/server/src/apub/activities.rs @@ -1,20 +1,11 @@ use crate::{ - apub::{ - check_is_apub_id_valid, - community::do_announce, - extensions::signatures::sign, - insert_activity, - ActorType, - }, - request::retry_custom, + apub::{activity_sender::SendUserActivity, community::do_announce, insert_activity}, LemmyContext, LemmyError, }; use activitystreams::base::AnyBase; -use actix_web::client::Client; use lemmy_db::{community::Community, user::User_}; use lemmy_utils::{get_apub_protocol_string, settings::Settings}; -use log::debug; use url::{ParseError, Url}; use uuid::Uuid; @@ -31,42 +22,12 @@ pub async fn send_activity_to_community( if community.local { do_announce(activity, &community, creator, context).await?; } else { - send_activity(context.client(), &activity, creator, to).await?; - } - - Ok(()) -} - -/// Send an activity to a list of recipients, using the correct headers etc. -pub async fn send_activity( - client: &Client, - activity: &AnyBase, - actor: &dyn ActorType, - to: Vec, -) -> Result<(), LemmyError> { - if !Settings::get().federation.enabled { - return Ok(()); - } - - let activity = serde_json::to_string(&activity)?; - debug!("Sending activitypub activity {} to {:?}", activity, to); - - for to_url in to { - check_is_apub_id_valid(&to_url)?; - - let res = retry_custom(|| async { - let request = client - .post(to_url.as_str()) - .header("Content-Type", "application/json"); - - match sign(request, actor, activity.clone()).await { - Ok(signed) => Ok(signed.send().await), - Err(e) => Err(e), - } - }) - .await?; - - debug!("Result for activity send: {:?}", res); + let message = SendUserActivity { + activity, + actor: creator.to_owned(), + to, + }; + context.activity_sender().send(message).await??; } Ok(()) diff --git a/server/src/apub/activity_sender.rs b/server/src/apub/activity_sender.rs new file mode 100644 index 000000000..df7ed12cf --- /dev/null +++ b/server/src/apub/activity_sender.rs @@ -0,0 +1,95 @@ +use crate::{ + apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, + LemmyError, +}; +use activitystreams::base::AnyBase; +use actix::prelude::*; +use awc::Client; +use lemmy_db::{community::Community, user::User_}; +use lemmy_utils::settings::Settings; +use log::debug; +use url::Url; + +// We cant use ActorType here, because it doesnt implement Sized +#[derive(Message)] +#[rtype(result = "Result<(), LemmyError>")] +pub struct SendUserActivity { + pub activity: AnyBase, + pub actor: User_, + pub to: Vec, +} + +#[derive(Message)] +#[rtype(result = "Result<(), LemmyError>")] +pub struct SendCommunityActivity { + pub activity: AnyBase, + pub actor: Community, + pub to: Vec, +} + +pub struct ActivitySender { + client: Client, +} + +impl ActivitySender { + pub fn startup(client: Client) -> ActivitySender { + ActivitySender { client } + } +} + +impl Actor for ActivitySender { + type Context = Context; +} + +impl Handler for ActivitySender { + type Result = Result<(), LemmyError>; + + fn handle(&mut self, msg: SendUserActivity, _ctx: &mut Context) -> Self::Result { + send_activity(msg.activity, &msg.actor, msg.to, &self.client) + } +} + +impl Handler for ActivitySender { + type Result = Result<(), LemmyError>; + + fn handle(&mut self, msg: SendCommunityActivity, _ctx: &mut Context) -> Self::Result { + send_activity(msg.activity, &msg.actor, msg.to, &self.client) + } +} + +fn send_activity( + activity: AnyBase, + actor: &dyn ActorType, + to: Vec, + client: &Client, +) -> Result<(), LemmyError> { + if !Settings::get().federation.enabled { + return Ok(()); + } + + let serialised_activity = serde_json::to_string(&activity)?; + debug!( + "Sending activitypub activity {} to {:?}", + &serialised_activity, &to + ); + + for to_url in &to { + check_is_apub_id_valid(&to_url)?; + + let request = client + .post(to_url.as_str()) + .header("Content-Type", "application/json"); + + let serialised_activity = serialised_activity.clone(); + Box::pin(async move { + // TODO: need to remove the unwrap, but ? causes compile errors + // TODO: if the sending fails, it should retry with exponential backoff + let signed = sign(request, actor, serialised_activity).await.unwrap(); + let res = signed.send().await; + debug!("Result for activity send: {:?}", res); + Ok::<(), LemmyError>(()) + }); + } + + Ok(()) +} diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index 016f342dc..30a3b2e98 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -1,7 +1,8 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_sender::{SendCommunityActivity, SendUserActivity}, check_actor_domain, create_apub_response, create_apub_tombstone_response, @@ -155,7 +156,12 @@ impl ActorType for Community { insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; - send_activity(context.client(), &accept.into_any_base()?, self, vec![to]).await?; + let message = SendCommunityActivity { + activity: accept.into_any_base()?, + actor: self.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -176,7 +182,12 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &delete.into_any_base()?, creator, inboxes).await?; + let message = SendUserActivity { + activity: delete.into_any_base()?, + actor: creator.to_owned(), + to: inboxes, + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -208,7 +219,12 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &undo.into_any_base()?, creator, inboxes).await?; + let message = SendUserActivity { + activity: undo.into_any_base()?, + actor: creator.to_owned(), + to: inboxes, + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -229,7 +245,12 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &remove.into_any_base()?, mod_, inboxes).await?; + let message = SendUserActivity { + activity: remove.into_any_base()?, + actor: mod_.to_owned(), + to: inboxes, + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -258,7 +279,12 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for remove , the creator is the actor, and does the signing - send_activity(context.client(), &undo.into_any_base()?, mod_, inboxes).await?; + let message = SendUserActivity { + activity: undo.into_any_base()?, + actor: mod_.to_owned(), + to: inboxes, + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -511,7 +537,12 @@ pub async fn do_announce( let community_shared_inbox = community.get_shared_inbox_url()?; to.retain(|x| x != &community_shared_inbox); - send_activity(context.client(), &announce.into_any_base()?, community, to).await?; + let message = SendCommunityActivity { + activity: announce.into_any_base()?, + actor: community.to_owned(), + to, + }; + context.activity_sender().send(message).await??; Ok(()) } diff --git a/server/src/apub/mod.rs b/server/src/apub/mod.rs index dddbd7e04..1675ad1d6 100644 --- a/server/src/apub/mod.rs +++ b/server/src/apub/mod.rs @@ -1,3 +1,4 @@ +pub mod activity_sender; pub mod activities; pub mod comment; pub mod community; diff --git a/server/src/apub/private_message.rs b/server/src/apub/private_message.rs index 8e5836885..259204071 100644 --- a/server/src/apub/private_message.rs +++ b/server/src/apub/private_message.rs @@ -1,6 +1,7 @@ use crate::{ apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_sender::SendUserActivity, check_actor_domain, check_is_apub_id_valid, create_tombstone, @@ -134,13 +135,12 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, create.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &create.into_any_base()?, - creator, - vec![to], - ) - .await?; + let message = SendUserActivity { + activity: create.into_any_base()?, + actor: creator.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -160,13 +160,12 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, update.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &update.into_any_base()?, - creator, - vec![to], - ) - .await?; + let message = SendUserActivity { + activity: update.into_any_base()?, + actor: creator.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -185,13 +184,12 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, delete.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &delete.into_any_base()?, - creator, - vec![to], - ) - .await?; + let message = SendUserActivity { + activity: delete.into_any_base()?, + actor: creator.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -221,7 +219,12 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, undo.clone(), true, context.pool()).await?; - send_activity(context.client(), &undo.into_any_base()?, creator, vec![to]).await?; + let message = SendUserActivity { + activity: undo.into_any_base()?, + actor: creator.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } diff --git a/server/src/apub/user.rs b/server/src/apub/user.rs index f6225dea2..f93f9408e 100644 --- a/server/src/apub/user.rs +++ b/server/src/apub/user.rs @@ -1,7 +1,8 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_sender::SendUserActivity, check_actor_domain, create_apub_response, fetcher::get_or_fetch_and_upsert_actor, @@ -127,7 +128,12 @@ impl ActorType for User_ { insert_activity(self.id, follow.clone(), true, context.pool()).await?; - send_activity(context.client(), &follow.into_any_base()?, self, vec![to]).await?; + let message = SendUserActivity { + activity: follow.into_any_base()?, + actor: self.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } @@ -152,7 +158,12 @@ impl ActorType for User_ { insert_activity(self.id, undo.clone(), true, context.pool()).await?; - send_activity(context.client(), &undo.into_any_base()?, self, vec![to]).await?; + let message = SendUserActivity { + activity: undo.into_any_base()?, + actor: self.to_owned(), + to: vec![to], + }; + context.activity_sender().send(message).await??; Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 07ee15d40..df0e2123d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -33,6 +33,8 @@ use crate::{ request::{retry, RecvError}, websocket::server::ChatServer, }; + +use crate::apub::activity_sender::ActivitySender; use actix::Addr; use actix_web::{client::Client, dev::ConnectionInfo}; use anyhow::anyhow; @@ -75,14 +77,21 @@ pub struct LemmyContext { pub pool: DbPool, pub chat_server: Addr, pub client: Client, + pub activity_sender: Addr, } impl LemmyContext { - pub fn create(pool: DbPool, chat_server: Addr, client: Client) -> LemmyContext { + pub fn create( + pool: DbPool, + chat_server: Addr, + client: Client, + activity_sender: Addr, + ) -> LemmyContext { LemmyContext { pool, chat_server, client, + activity_sender, } } pub fn pool(&self) -> &DbPool { @@ -94,6 +103,9 @@ impl LemmyContext { pub fn client(&self) -> &Client { &self.client } + pub fn activity_sender(&self) -> &Addr { + &self.activity_sender + } } impl Clone for LemmyContext { @@ -102,6 +114,7 @@ impl Clone for LemmyContext { pool: self.pool.clone(), chat_server: self.chat_server.clone(), client: self.client.clone(), + activity_sender: self.activity_sender.clone(), } } } diff --git a/server/src/main.rs b/server/src/main.rs index f6ce4f096..1ecfa3124 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,6 +20,7 @@ use diesel::{ }; use lemmy_db::get_database_url_from_env; use lemmy_server::{ + apub::activity_sender::ActivitySender, blocking, code_migrations::run_advanced_migrations, rate_limit::{rate_limiter::RateLimiter, RateLimit}, @@ -76,9 +77,20 @@ async fn main() -> Result<(), LemmyError> { // Create Http server with websocket support HttpServer::new(move || { - let chat_server = - ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start(); - let context = LemmyContext::create(pool.clone(), chat_server, Client::default()); + let activity_sender = ActivitySender::startup(Client::default()).start(); + let chat_server = ChatServer::startup( + pool.clone(), + rate_limiter.clone(), + Client::default(), + activity_sender.clone(), + ) + .start(); + let context = LemmyContext::create( + pool.clone(), + chat_server, + Client::default(), + activity_sender, + ); let settings = Settings::get(); let rate_limiter = rate_limiter.clone(); App::new() diff --git a/server/src/request.rs b/server/src/request.rs index 70a2b6933..96845a484 100644 --- a/server/src/request.rs +++ b/server/src/request.rs @@ -19,7 +19,7 @@ where retry_custom(|| async { Ok((f)().await) }).await } -pub async fn retry_custom(f: F) -> Result +async fn retry_custom(f: F) -> Result where F: Fn() -> Fut, Fut: Future, LemmyError>>, diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index bfc55f637..b4293c9ca 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -5,6 +5,7 @@ use super::*; use crate::{ api::{comment::*, community::*, post::*, site::*, user::*, *}, + apub::activity_sender::ActivitySender, rate_limit::RateLimit, websocket::UserOperation, CommunityId, @@ -181,6 +182,8 @@ pub struct ChatServer { /// An HTTP Client client: Client, + + activity_sender: Addr, } impl ChatServer { @@ -188,6 +191,7 @@ impl ChatServer { pool: Pool>, rate_limiter: RateLimit, client: Client, + activity_sender: Addr, ) -> ChatServer { ChatServer { sessions: HashMap::new(), @@ -199,6 +203,7 @@ impl ChatServer { rate_limiter, captchas: Vec::new(), client, + activity_sender, } } @@ -455,6 +460,7 @@ impl ChatServer { }; let client = self.client.clone(); + let activity_sender = self.activity_sender.clone(); async move { let msg = msg; let json: Value = serde_json::from_str(&msg.msg)?; @@ -469,6 +475,7 @@ impl ChatServer { pool, chat_server: addr, client, + activity_sender, }; let args = Args { context: &context,