diff --git a/lemmy_apub/src/activities.rs b/lemmy_apub/src/activities.rs deleted file mode 100644 index 18781ef42c..0000000000 --- a/lemmy_apub/src/activities.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::{activity_queue::send_activity, community::do_announce, insert_activity}; -use activitystreams::{ - base::{Extends, ExtendsExt}, - object::AsObject, -}; -use lemmy_db::{community::Community, user::User_}; -use lemmy_utils::{settings::Settings, LemmyError}; -use lemmy_websocket::LemmyContext; -use serde::{export::fmt::Debug, Serialize}; -use url::{ParseError, Url}; -use uuid::Uuid; - -pub async fn send_activity_to_community( - creator: &User_, - community: &Community, - to: Vec, - activity: T, - context: &LemmyContext, -) -> Result<(), LemmyError> -where - T: AsObject + Extends + Serialize + Debug + Send + Clone + 'static, - Kind: Serialize, - >::Error: From + Send + Sync + 'static, -{ - // TODO: looks like call this sometimes with activity, and sometimes with any_base - insert_activity(creator.id, activity.clone(), true, context.pool()).await?; - - // if this is a local community, we need to do an announce from the community instead - if community.local { - do_announce(activity.into_any_base()?, &community, creator, context).await?; - } else { - send_activity(context.activity_queue(), activity, creator, to)?; - } - - Ok(()) -} - -pub(in crate) fn generate_activity_id(kind: T) -> Result -where - T: ToString, -{ - let id = format!( - "{}/activities/{}/{}", - Settings::get().get_protocol_and_hostname(), - kind.to_string().to_lowercase(), - Uuid::new_v4() - ); - Url::parse(&id) -} diff --git a/lemmy_apub/src/activity_queue.rs b/lemmy_apub/src/activity_queue.rs index ece782c5d2..960e126b87 100644 --- a/lemmy_apub/src/activity_queue.rs +++ b/lemmy_apub/src/activity_queue.rs @@ -1,4 +1,10 @@ -use crate::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}; +use crate::{ + check_is_apub_id_valid, + community::do_announce, + extensions::signatures::sign, + insert_activity, + ActorType, +}; use activitystreams::{ base::{Extends, ExtendsExt}, object::AsObject, @@ -13,22 +19,148 @@ use background_jobs::{ QueueHandle, WorkerConfig, }; +use itertools::Itertools; +use lemmy_db::{community::Community, user::User_, DbPool}; use lemmy_utils::{location_info, settings::Settings, LemmyError}; +use lemmy_websocket::LemmyContext; use log::warn; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, future::Future, pin::Pin}; use url::Url; -pub fn send_activity( +pub async fn send_activity_single_dest( + activity: T, + creator: &dyn ActorType, + to: Url, + context: &LemmyContext, +) -> Result<(), LemmyError> +where + T: AsObject + Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + if check_is_apub_id_valid(&to).is_ok() { + send_activity_internal( + context.activity_queue(), + activity, + creator, + vec![to], + context.pool(), + ) + .await?; + } + + Ok(()) +} + +pub async fn send_to_community_followers( + activity: T, + community: &Community, + context: &LemmyContext, + sender_shared_inbox: Option, +) -> Result<(), LemmyError> +where + T: AsObject + Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + // dont send to the local instance, nor to the instance where the activity originally came from, + // because that would result in a database error (same data inserted twice) + let community_shared_inbox = community.get_shared_inbox_url()?; + let to: Vec = community + .get_follower_inboxes(context.pool()) + .await? + .iter() + .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref()) + .filter(|inbox| inbox != &&community_shared_inbox) + .filter(|inbox| check_is_apub_id_valid(inbox).is_ok()) + .unique() + .map(|inbox| inbox.to_owned()) + .collect(); + + send_activity_internal( + context.activity_queue(), + activity, + community, + to, + context.pool(), + ) + .await?; + + Ok(()) +} + +pub async fn send_to_community( + creator: &User_, + community: &Community, + activity: T, + context: &LemmyContext, +) -> Result<(), LemmyError> +where + T: AsObject + Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + // if this is a local community, we need to do an announce from the community instead + if community.local { + do_announce(activity.into_any_base()?, &community, creator, context).await?; + } else { + let inbox = community.get_shared_inbox_url()?; + check_is_apub_id_valid(&inbox)?; + send_activity_internal( + context.activity_queue(), + activity, + creator, + vec![inbox], + context.pool(), + ) + .await?; + } + + Ok(()) +} + +pub async fn send_comment_mentions( + creator: &User_, + mentions: Vec, + activity: T, + context: &LemmyContext, +) -> Result<(), LemmyError> +where + T: AsObject + Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + let mentions = mentions + .iter() + .filter(|inbox| check_is_apub_id_valid(inbox).is_ok()) + .map(|i| i.to_owned()) + .collect(); + send_activity_internal( + context.activity_queue(), + activity, + creator, + mentions, + context.pool(), + ) + .await?; + Ok(()) +} + +/// Asynchronously sends the given `activity` from `actor` to every inbox URL in `to`. +/// +/// The caller of this function needs to remove any blocked domains from `to`, +/// using `check_is_apub_id_valid()`. +async fn send_activity_internal( activity_sender: &QueueHandle, activity: T, actor: &dyn ActorType, to: Vec, + pool: &DbPool, ) -> Result<(), LemmyError> where - T: AsObject, - T: Extends, + T: AsObject + Extends, Kind: Serialize, >::Error: From + Send + Sync + 'static, { @@ -36,12 +168,13 @@ where return Ok(()); } + for to_url in &to { + assert!(check_is_apub_id_valid(&to_url).is_ok()); + } + let activity = activity.into_any_base()?; let serialised_activity = serde_json::to_string(&activity)?; - - for to_url in &to { - check_is_apub_id_valid(&to_url)?; - } + insert_activity(actor.user_id(), serialised_activity.clone(), true, pool).await?; // TODO: it would make sense to create a separate task for each destination server let message = SendActivityTask { diff --git a/lemmy_apub/src/comment.rs b/lemmy_apub/src/comment.rs index 4e5c173f89..7f6885a190 100644 --- a/lemmy_apub/src/comment.rs +++ b/lemmy_apub/src/comment.rs @@ -1,5 +1,5 @@ use crate::{ - activities::{generate_activity_id, send_activity_to_community}, + activity_queue::{send_comment_mentions, send_to_community}, check_actor_domain, create_apub_response, create_apub_tombstone_response, @@ -10,6 +10,7 @@ use crate::{ get_or_fetch_and_insert_post, get_or_fetch_and_upsert_user, }, + generate_activity_id, ActorType, ApubLikeableType, ApubObjectType, @@ -219,7 +220,8 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?; + send_to_community(&creator, &community, create.clone(), context).await?; + send_comment_mentions(&creator, maa.inboxes, create, context).await?; Ok(()) } @@ -247,7 +249,8 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?; + send_to_community(&creator, &community, update.clone(), context).await?; + send_comment_mentions(&creator, maa.inboxes, update, context).await?; Ok(()) } @@ -270,14 +273,7 @@ impl ApubObjectType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - delete, - context, - ) - .await?; + send_to_community(&creator, &community, delete, context).await?; Ok(()) } @@ -313,14 +309,7 @@ impl ApubObjectType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(&creator, &community, undo, context).await?; Ok(()) } @@ -343,14 +332,7 @@ impl ApubObjectType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &mod_, - &community, - vec![community.get_shared_inbox_url()?], - remove, - context, - ) - .await?; + send_to_community(&mod_, &community, remove, context).await?; Ok(()) } @@ -382,14 +364,7 @@ impl ApubObjectType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &mod_, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(&mod_, &community, undo, context).await?; Ok(()) } } @@ -415,14 +390,7 @@ impl ApubLikeableType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - like, - context, - ) - .await?; + send_to_community(&creator, &community, like, context).await?; Ok(()) } @@ -445,14 +413,7 @@ impl ApubLikeableType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - dislike, - context, - ) - .await?; + send_to_community(&creator, &community, dislike, context).await?; Ok(()) } @@ -487,14 +448,7 @@ impl ApubLikeableType for Comment { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(&creator, &community, undo, context).await?; Ok(()) } } diff --git a/lemmy_apub/src/community.rs b/lemmy_apub/src/community.rs index 715b765b37..44f5e6e10e 100644 --- a/lemmy_apub/src/community.rs +++ b/lemmy_apub/src/community.rs @@ -1,13 +1,13 @@ use crate::{ - activities::generate_activity_id, - activity_queue::send_activity, + activity_queue::{send_activity_single_dest, send_to_community_followers}, check_actor_domain, + check_is_apub_id_valid, create_apub_response, create_apub_tombstone_response, create_tombstone, extensions::group_extensions::GroupExtension, fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_user}, - insert_activity, + generate_activity_id, ActorType, FromApub, GroupExt, @@ -167,9 +167,7 @@ impl ActorType for Community { .set_id(generate_activity_id(AcceptType::Accept)?) .set_to(to.clone()); - insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), accept, self, vec![to])?; + send_activity_single_dest(accept, self, to, context).await?; Ok(()) } @@ -183,14 +181,7 @@ impl ActorType for Community { .set_to(public()) .set_many_ccs(vec![self.get_followers_url()?]); - insert_activity(self.creator_id, delete.clone(), true, context.pool()).await?; - - let inboxes = self.get_follower_inboxes(context.pool()).await?; - - // Note: For an accept, since it was automatic, no one pushed a button, - // the community was the actor. - // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_queue(), delete, creator, inboxes)?; + send_to_community_followers(delete, self, context, None).await?; Ok(()) } @@ -215,14 +206,7 @@ impl ActorType for Community { .set_to(public()) .set_many_ccs(vec![self.get_followers_url()?]); - insert_activity(self.creator_id, undo.clone(), true, context.pool()).await?; - - let inboxes = self.get_follower_inboxes(context.pool()).await?; - - // Note: For an accept, since it was automatic, no one pushed a button, - // the community was the actor. - // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_queue(), undo, creator, inboxes)?; + send_to_community_followers(undo, self, context, None).await?; Ok(()) } @@ -236,14 +220,7 @@ impl ActorType for Community { .set_to(public()) .set_many_ccs(vec![self.get_followers_url()?]); - insert_activity(mod_.id, remove.clone(), true, context.pool()).await?; - - let inboxes = self.get_follower_inboxes(context.pool()).await?; - - // Note: For an accept, since it was automatic, no one pushed a button, - // the community was the actor. - // But for delete, the creator is the actor, and does the signing - send_activity(context.activity_queue(), remove, mod_, inboxes)?; + send_to_community_followers(remove, self, context, None).await?; Ok(()) } @@ -265,14 +242,7 @@ impl ActorType for Community { .set_to(public()) .set_many_ccs(vec![self.get_followers_url()?]); - insert_activity(mod_.id, undo.clone(), true, context.pool()).await?; - - let inboxes = self.get_follower_inboxes(context.pool()).await?; - - // Note: For an accept, since it was automatic, no one pushed a button, - // the community was the actor. - // But for remove , the creator is the actor, and does the signing - send_activity(context.activity_queue(), undo, mod_, inboxes)?; + send_to_community_followers(undo, self, context, None).await?; Ok(()) } @@ -305,6 +275,8 @@ impl ActorType for Community { ))?) }) .filter_map(Result::ok) + // Don't send to blocked instances + .filter(|inbox| check_is_apub_id_valid(inbox).is_ok()) .unique() .collect(); @@ -513,19 +485,13 @@ pub async fn do_announce( .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - insert_activity(community.creator_id, announce.clone(), true, context.pool()).await?; - - let mut to: Vec = community.get_follower_inboxes(context.pool()).await?; - - // dont send to the local instance, nor to the instance where the activity originally came from, - // because that would result in a database error (same data inserted twice) - // this seems to be the "easiest" stable alternative for remove_item() - let sender_shared_inbox = sender.get_shared_inbox_url()?; - to.retain(|x| x != &sender_shared_inbox); - let community_shared_inbox = community.get_shared_inbox_url()?; - to.retain(|x| x != &community_shared_inbox); - - send_activity(context.activity_queue(), announce, community, to)?; + send_to_community_followers( + announce, + community, + context, + Some(sender.get_shared_inbox_url()?), + ) + .await?; Ok(()) } diff --git a/lemmy_apub/src/lib.rs b/lemmy_apub/src/lib.rs index 3f37c5d3c0..1f6e75e4cf 100644 --- a/lemmy_apub/src/lib.rs +++ b/lemmy_apub/src/lib.rs @@ -1,7 +1,6 @@ #[macro_use] extern crate lazy_static; -pub mod activities; pub mod activity_queue; pub mod comment; pub mod community; @@ -43,6 +42,7 @@ use log::debug; use reqwest::Client; use serde::Serialize; use url::{ParseError, Url}; +use uuid::Uuid; type GroupExt = Ext2, GroupExtension, PublicKeyExtension>; type PersonExt = Ext1, PublicKeyExtension>; @@ -360,3 +360,16 @@ where .await??; Ok(()) } + +pub(in crate) fn generate_activity_id(kind: T) -> Result +where + T: ToString, +{ + let id = format!( + "{}/activities/{}/{}", + Settings::get().get_protocol_and_hostname(), + kind.to_string().to_lowercase(), + Uuid::new_v4() + ); + Url::parse(&id) +} diff --git a/lemmy_apub/src/post.rs b/lemmy_apub/src/post.rs index 8f5ffbcb8f..2d615ea1b0 100644 --- a/lemmy_apub/src/post.rs +++ b/lemmy_apub/src/post.rs @@ -1,11 +1,12 @@ use crate::{ - activities::{generate_activity_id, send_activity_to_community}, + activity_queue::send_to_community, check_actor_domain, create_apub_response, create_apub_tombstone_response, create_tombstone, extensions::page_extension::PageExtension, fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user}, + generate_activity_id, ActorType, ApubLikeableType, ApubObjectType, @@ -257,14 +258,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - creator, - &community, - vec![community.get_shared_inbox_url()?], - create, - context, - ) - .await?; + send_to_community(creator, &community, create, context).await?; Ok(()) } @@ -285,14 +279,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - creator, - &community, - vec![community.get_shared_inbox_url()?], - update, - context, - ) - .await?; + send_to_community(creator, &community, update, context).await?; Ok(()) } @@ -312,14 +299,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - creator, - &community, - vec![community.get_shared_inbox_url()?], - delete, - context, - ) - .await?; + send_to_community(creator, &community, delete, context).await?; Ok(()) } @@ -351,14 +331,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - creator, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(creator, &community, undo, context).await?; Ok(()) } @@ -378,14 +351,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - mod_, - &community, - vec![community.get_shared_inbox_url()?], - remove, - context, - ) - .await?; + send_to_community(mod_, &community, remove, context).await?; Ok(()) } @@ -413,14 +379,7 @@ impl ApubObjectType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - mod_, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(mod_, &community, undo, context).await?; Ok(()) } } @@ -443,14 +402,7 @@ impl ApubLikeableType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - like, - context, - ) - .await?; + send_to_community(&creator, &community, like, context).await?; Ok(()) } @@ -470,14 +422,7 @@ impl ApubLikeableType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - dislike, - context, - ) - .await?; + send_to_community(&creator, &community, dislike, context).await?; Ok(()) } @@ -509,14 +454,7 @@ impl ApubLikeableType for Post { .set_to(public()) .set_many_ccs(vec![community.get_followers_url()?]); - send_activity_to_community( - &creator, - &community, - vec![community.get_shared_inbox_url()?], - undo, - context, - ) - .await?; + send_to_community(&creator, &community, undo, context).await?; Ok(()) } } diff --git a/lemmy_apub/src/private_message.rs b/lemmy_apub/src/private_message.rs index d61a7771ea..fd8e6c6b01 100644 --- a/lemmy_apub/src/private_message.rs +++ b/lemmy_apub/src/private_message.rs @@ -1,11 +1,10 @@ use crate::{ - activities::generate_activity_id, - activity_queue::send_activity, + activity_queue::send_activity_single_dest, check_actor_domain, check_is_apub_id_valid, create_tombstone, fetcher::get_or_fetch_and_upsert_user, - insert_activity, + generate_activity_id, ActorType, ApubObjectType, FromApub, @@ -130,9 +129,7 @@ impl ApubObjectType for PrivateMessage { .set_id(generate_activity_id(CreateType::Create)?) .set_to(to.clone()); - insert_activity(creator.id, create.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), create, creator, vec![to])?; + send_activity_single_dest(create, creator, to, context).await?; Ok(()) } @@ -150,9 +147,7 @@ impl ApubObjectType for PrivateMessage { .set_id(generate_activity_id(UpdateType::Update)?) .set_to(to.clone()); - insert_activity(creator.id, update.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), update, creator, vec![to])?; + send_activity_single_dest(update, creator, to, context).await?; Ok(()) } @@ -169,9 +164,7 @@ impl ApubObjectType for PrivateMessage { .set_id(generate_activity_id(DeleteType::Delete)?) .set_to(to.clone()); - insert_activity(creator.id, delete.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), delete, creator, vec![to])?; + send_activity_single_dest(delete, creator, to, context).await?; Ok(()) } @@ -199,9 +192,7 @@ impl ApubObjectType for PrivateMessage { .set_id(generate_activity_id(UndoType::Undo)?) .set_to(to.clone()); - insert_activity(creator.id, undo.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), undo, creator, vec![to])?; + send_activity_single_dest(undo, creator, to, context).await?; Ok(()) } diff --git a/lemmy_apub/src/user.rs b/lemmy_apub/src/user.rs index 60af834ca6..3f6e6971ce 100644 --- a/lemmy_apub/src/user.rs +++ b/lemmy_apub/src/user.rs @@ -1,10 +1,9 @@ use crate::{ - activities::generate_activity_id, - activity_queue::send_activity, + activity_queue::send_activity_single_dest, check_actor_domain, create_apub_response, fetcher::get_or_fetch_and_upsert_actor, - insert_activity, + generate_activity_id, ActorType, FromApub, PersonExt, @@ -126,9 +125,7 @@ impl ActorType for User_ { let follow_actor = get_or_fetch_and_upsert_actor(follow_actor_id, context).await?; let to = follow_actor.get_inbox_url()?; - insert_activity(self.id, follow.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), follow, self, vec![to])?; + send_activity_single_dest(follow, self, to, context).await?; Ok(()) } @@ -151,9 +148,7 @@ impl ActorType for User_ { .set_context(activitystreams::context()) .set_id(generate_activity_id(UndoType::Undo)?); - insert_activity(self.id, undo.clone(), true, context.pool()).await?; - - send_activity(context.activity_queue(), undo, self, vec![to])?; + send_activity_single_dest(undo, self, to, context).await?; Ok(()) }