From f7f64363ea6f2ce82605a6dcbd61107c0d094459 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 19 Aug 2020 16:26:20 +0200 Subject: [PATCH] rewrite with proper error handling and less boilerplate --- server/src/apub/activities.rs | 29 +++-- server/src/apub/activity_sender.rs | 142 ++++++++++++++--------- server/src/apub/comment.rs | 32 ++--- server/src/apub/community.rs | 44 ++----- server/src/apub/extensions/signatures.rs | 7 +- server/src/apub/mod.rs | 2 +- server/src/apub/post.rs | 18 +-- server/src/apub/private_message.rs | 30 +---- server/src/apub/user.rs | 16 +-- 9 files changed, 143 insertions(+), 177 deletions(-) diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs index a8fc9cc27..341ee2f36 100644 --- a/server/src/apub/activities.rs +++ b/server/src/apub/activities.rs @@ -1,33 +1,38 @@ use crate::{ - apub::{activity_sender::SendUserActivity, community::do_announce, insert_activity}, + apub::{activity_sender::send_activity, community::do_announce, insert_activity}, LemmyContext, LemmyError, }; -use activitystreams::base::AnyBase; +use activitystreams::{ + base::{Extends, ExtendsExt}, + object::AsObject, +}; use lemmy_db::{community::Community, user::User_}; use lemmy_utils::{get_apub_protocol_string, settings::Settings}; +use serde::{export::fmt::Debug, Serialize}; use url::{ParseError, Url}; use uuid::Uuid; -pub async fn send_activity_to_community( +pub async fn send_activity_to_community( creator: &User_, community: &Community, to: Vec, - activity: AnyBase, + activity: T, context: &LemmyContext, -) -> Result<(), LemmyError> { +) -> Result<(), LemmyError> +where + T: AsObject + Extends + Serialize + Debug + Send + Clone + 'static, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + // TODO: looks like call this sometimes with activity, and sometimes with any_base insert_activity(creator.id, activity.clone(), true, context.pool()).await?; // if this is a local community, we need to do an announce from the community instead if community.local { - do_announce(activity, &community, creator, context).await?; + do_announce(activity.into_any_base()?, &community, creator, context).await?; } else { - let message = SendUserActivity { - activity, - actor: creator.to_owned(), - to, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), activity, creator, to)?; } Ok(()) diff --git a/server/src/apub/activity_sender.rs b/server/src/apub/activity_sender.rs index 763f34083..a6a3c2b57 100644 --- a/server/src/apub/activity_sender.rs +++ b/server/src/apub/activity_sender.rs @@ -2,29 +2,59 @@ use crate::{ apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, LemmyError, }; -use activitystreams::base::AnyBase; +use activitystreams::{ + base::{Extends, ExtendsExt}, + object::AsObject, +}; use actix::prelude::*; +use anyhow::Context; use awc::Client; -use lemmy_db::{community::Community, user::User_}; -use lemmy_utils::settings::Settings; -use log::debug; +use lemmy_utils::{location_info, settings::Settings}; +use log::{debug, warn}; +use serde::Serialize; use url::Url; -// We cant use ActorType here, because it doesnt implement Sized -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendUserActivity { - pub activity: AnyBase, - pub actor: User_, - pub to: Vec, +pub fn send_activity( + activity_sender: &Addr, + activity: T, + actor: &dyn ActorType, + to: Vec, +) -> Result<(), LemmyError> +where + T: AsObject, + T: Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + if !Settings::get().federation.enabled { + return Ok(()); + } + + let activity = activity.into_any_base()?; + let serialised_activity = serde_json::to_string(&activity)?; + + for to_url in &to { + check_is_apub_id_valid(&to_url)?; + } + + let message = SendActivity { + activity: serialised_activity, + to, + actor_id: actor.actor_id()?, + private_key: actor.private_key().context(location_info!())?, + }; + activity_sender.do_send(message); + + Ok(()) } #[derive(Message)] #[rtype(result = "()")] -pub struct SendCommunityActivity { - pub activity: AnyBase, - pub actor: Community, - pub to: Vec, +struct SendActivity { + activity: String, + to: Vec, + actor_id: Url, + private_key: String, } pub struct ActivitySender { @@ -38,51 +68,55 @@ impl ActivitySender { } impl Actor for ActivitySender { - type Context = Context; + type Context = actix::Context; } -impl Handler for ActivitySender { +impl Handler for ActivitySender { type Result = (); - fn handle(&mut self, msg: SendUserActivity, _ctx: &mut Context) -> Self::Result { - send_activity(msg.activity, &msg.actor, msg.to, &self.client); - } -} + fn handle(&mut self, msg: SendActivity, _ctx: &mut actix::Context) -> Self::Result { + debug!( + "Sending activitypub activity {} to {:?}", + &msg.activity, &msg.to + ); -impl Handler for ActivitySender { - type Result = (); - - fn handle(&mut self, msg: SendCommunityActivity, _ctx: &mut Context) -> Self::Result { - send_activity(msg.activity, &msg.actor, msg.to, &self.client); - } -} - -fn send_activity(activity: AnyBase, actor: &dyn ActorType, to: Vec, client: &Client) { - if !Settings::get().federation.enabled { - return; - } - - let serialised_activity = serde_json::to_string(&activity).unwrap(); - debug!( - "Sending activitypub activity {} to {:?}", - &serialised_activity, &to - ); - - for to_url in &to { - check_is_apub_id_valid(&to_url).unwrap(); - - let request = client - .post(to_url.as_str()) - .header("Content-Type", "application/json"); - - let serialised_activity = serialised_activity.clone(); Box::pin(async move { - // TODO: need to remove the unwrap, but ? causes compile errors - // TODO: if the sending fails, it should retry with exponential backoff - let signed = sign(request, actor, serialised_activity).await.unwrap(); - let res = signed.send().await; - debug!("Result for activity send: {:?}", res); - Ok::<(), LemmyError>(()) + for to_url in &msg.to { + let request = self + .client + .post(to_url.as_str()) + .header("Content-Type", "application/json"); + + let signed = sign( + request, + msg.activity.clone(), + &msg.actor_id, + msg.private_key.to_owned(), + ) + .await; + + let signed = match signed { + Ok(s) => s, + Err(e) => { + warn!( + "Failed to sign activity {} from {}: {}", + &msg.activity, &msg.actor_id, e + ); + return; + } + }; + + // TODO: if the sending fails, it should retry with exponential backoff + match signed.send().await { + Ok(_) => {} + Err(e) => { + warn!( + "Failed to send activity {} to {}: {}", + &msg.activity, &to_url, e + ); + } + } + } }); } } diff --git a/server/src/apub/comment.rs b/server/src/apub/comment.rs index a9a97c083..76d281b4e 100644 --- a/server/src/apub/comment.rs +++ b/server/src/apub/comment.rs @@ -224,14 +224,7 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community( - &creator, - &community, - maa.inboxes, - create.into_any_base()?, - context, - ) - .await?; + send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?; Ok(()) } @@ -259,14 +252,7 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community( - &creator, - &community, - maa.inboxes, - update.into_any_base()?, - context, - ) - .await?; + send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?; Ok(()) } @@ -293,7 +279,7 @@ impl ApubObjectType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - delete.into_any_base()?, + delete, context, ) .await?; @@ -336,7 +322,7 @@ impl ApubObjectType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -366,7 +352,7 @@ impl ApubObjectType for Comment { &mod_, &community, vec![community.get_shared_inbox_url()?], - remove.into_any_base()?, + remove, context, ) .await?; @@ -405,7 +391,7 @@ impl ApubObjectType for Comment { &mod_, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -438,7 +424,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - like.into_any_base()?, + like, context, ) .await?; @@ -468,7 +454,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - dislike.into_any_base()?, + dislike, context, ) .await?; @@ -510,7 +496,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index d695d8aa8..551d042c3 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -2,7 +2,7 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ activities::generate_activity_id, - activity_sender::{SendCommunityActivity, SendUserActivity}, + activity_sender::send_activity, check_actor_domain, create_apub_response, create_apub_tombstone_response, @@ -156,12 +156,7 @@ impl ActorType for Community { insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; - let message = SendCommunityActivity { - activity: accept.into_any_base()?, - actor: self.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), accept, self, vec![to])?; Ok(()) } @@ -182,12 +177,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - let message = SendUserActivity { - activity: delete.into_any_base()?, - actor: creator.to_owned(), - to: inboxes, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), delete, creator, inboxes)?; Ok(()) } @@ -219,12 +209,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - let message = SendUserActivity { - activity: undo.into_any_base()?, - actor: creator.to_owned(), - to: inboxes, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), undo, creator, inboxes)?; Ok(()) } @@ -245,12 +230,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - let message = SendUserActivity { - activity: remove.into_any_base()?, - actor: mod_.to_owned(), - to: inboxes, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), remove, mod_, inboxes)?; Ok(()) } @@ -279,12 +259,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for remove , the creator is the actor, and does the signing - let message = SendUserActivity { - activity: undo.into_any_base()?, - actor: mod_.to_owned(), - to: inboxes, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), undo, mod_, inboxes)?; Ok(()) } @@ -537,12 +512,7 @@ pub async fn do_announce( let community_shared_inbox = community.get_shared_inbox_url()?; to.retain(|x| x != &community_shared_inbox); - let message = SendCommunityActivity { - activity: announce.into_any_base()?, - actor: community.to_owned(), - to, - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), announce, community, to)?; Ok(()) } diff --git a/server/src/apub/extensions/signatures.rs b/server/src/apub/extensions/signatures.rs index 96063d5e0..cb9697775 100644 --- a/server/src/apub/extensions/signatures.rs +++ b/server/src/apub/extensions/signatures.rs @@ -16,6 +16,7 @@ use openssl::{ }; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use url::Url; lazy_static! { static ref HTTP_SIG_CONFIG: Config = Config::new(); @@ -24,11 +25,11 @@ lazy_static! { /// Signs request headers with the given keypair. pub async fn sign( request: ClientRequest, - actor: &dyn ActorType, activity: String, + actor_id: &Url, + private_key: String, ) -> Result, LemmyError> { - let signing_key_id = format!("{}#main-key", actor.actor_id()?); - let private_key = actor.private_key().context(location_info!())?; + let signing_key_id = format!("{}#main-key", actor_id); let digest_client = request .signature_with_digest( diff --git a/server/src/apub/mod.rs b/server/src/apub/mod.rs index 1675ad1d6..ab0ac4f25 100644 --- a/server/src/apub/mod.rs +++ b/server/src/apub/mod.rs @@ -1,5 +1,5 @@ -pub mod activity_sender; pub mod activities; +pub mod activity_sender; pub mod comment; pub mod community; pub mod extensions; diff --git a/server/src/apub/post.rs b/server/src/apub/post.rs index 4f2a83155..4c33f9107 100644 --- a/server/src/apub/post.rs +++ b/server/src/apub/post.rs @@ -318,7 +318,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - create.into_any_base()?, + create, context, ) .await?; @@ -346,7 +346,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - update.into_any_base()?, + update, context, ) .await?; @@ -373,7 +373,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - delete.into_any_base()?, + delete, context, ) .await?; @@ -412,7 +412,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -439,7 +439,7 @@ impl ApubObjectType for Post { mod_, &community, vec![community.get_shared_inbox_url()?], - remove.into_any_base()?, + remove, context, ) .await?; @@ -474,7 +474,7 @@ impl ApubObjectType for Post { mod_, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -504,7 +504,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - like.into_any_base()?, + like, context, ) .await?; @@ -531,7 +531,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - dislike.into_any_base()?, + dislike, context, ) .await?; @@ -570,7 +570,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; diff --git a/server/src/apub/private_message.rs b/server/src/apub/private_message.rs index de68a02b9..54e89112e 100644 --- a/server/src/apub/private_message.rs +++ b/server/src/apub/private_message.rs @@ -1,7 +1,7 @@ use crate::{ apub::{ activities::generate_activity_id, - activity_sender::SendUserActivity, + activity_sender::send_activity, check_actor_domain, check_is_apub_id_valid, create_tombstone, @@ -135,12 +135,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, create.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: create.into_any_base()?, - actor: creator.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), create, creator, vec![to])?; Ok(()) } @@ -160,12 +155,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, update.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: update.into_any_base()?, - actor: creator.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), update, creator, vec![to])?; Ok(()) } @@ -184,12 +174,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, delete.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: delete.into_any_base()?, - actor: creator.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), delete, creator, vec![to])?; Ok(()) } @@ -219,12 +204,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, undo.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: undo.into_any_base()?, - actor: creator.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), undo, creator, vec![to])?; Ok(()) } diff --git a/server/src/apub/user.rs b/server/src/apub/user.rs index a3cf16cdd..16771e963 100644 --- a/server/src/apub/user.rs +++ b/server/src/apub/user.rs @@ -2,7 +2,7 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ activities::generate_activity_id, - activity_sender::SendUserActivity, + activity_sender::send_activity, check_actor_domain, create_apub_response, fetcher::get_or_fetch_and_upsert_actor, @@ -128,12 +128,7 @@ impl ActorType for User_ { insert_activity(self.id, follow.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: follow.into_any_base()?, - actor: self.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), follow, self, vec![to])?; Ok(()) } @@ -158,12 +153,7 @@ impl ActorType for User_ { insert_activity(self.id, undo.clone(), true, context.pool()).await?; - let message = SendUserActivity { - activity: undo.into_any_base()?, - actor: self.to_owned(), - to: vec![to], - }; - context.activity_sender().do_send(message); + send_activity(context.activity_sender(), undo, self, vec![to])?; Ok(()) }