Federated Moderation #185

Manually merged
dessalines merged 20 commits from federated-moderation into main 2021-03-24 15:49:38 +00:00
37 changed files with 1034 additions and 397 deletions

4
Cargo.lock generated
View File

@ -4,9 +4,9 @@ version = 3
[[package]]
name = "activitystreams"
version = "0.7.0-alpha.10"
version = "0.7.0-alpha.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe7ceed015dfca322d3bcec3653909c77557e7e57df72e98cb8806e2c93cc919"
checksum = "3a5da1d857ec9ca65ef8d0469cdd64e7b93b59d6cad26f1444bf84b62f3eadd4"
dependencies = [
"chrono",
"mime",

View File

@ -22,6 +22,7 @@ import {
searchForUser,
banPersonFromSite,
searchPostLocal,
followCommunity,
banPersonFromCommunity,
} from './shared';
import { PostView, CommunityView } from 'lemmy-js-client';
@ -169,35 +170,38 @@ test('Sticky a post', async () => {
});
test('Lock a post', async () => {
await followCommunity(alpha, true, betaCommunity.community.id);
let postRes = await createPost(alpha, betaCommunity.community.id);
// Lock the post
let lockedPostRes = await lockPost(alpha, true, postRes.post_view.post);
let searchBeta = await searchPost(beta, postRes.post_view.post);
let betaPost1 = searchBeta.posts[0];
let lockedPostRes = await lockPost(beta, true, betaPost1.post);
dessalines marked this conversation as resolved
Review

I wonder why this didn't work in the opposite direction.

I wonder why this didn't work in the opposite direction.
Review

Because it was wrong. The remote user who is not a mod was locking their own post, which is not allowed as far as I understand. Until now the apub code allowed that, but I added a check for it.

Because it was wrong. The remote user who is not a mod was locking their own post, which is not allowed as far as I understand. Until now the apub code allowed that, but I added a check for it.
expect(lockedPostRes.post_view.post.locked).toBe(true);
// Make sure that post is locked on beta
let searchBeta = await searchPostLocal(beta, postRes.post_view.post);
let betaPost1 = searchBeta.posts[0];
expect(betaPost1.post.locked).toBe(true);
// Make sure that post is locked on alpha
let searchAlpha = await searchPostLocal(alpha, postRes.post_view.post);
let alphaPost1 = searchAlpha.posts[0];
expect(alphaPost1.post.locked).toBe(true);
// Try to make a new comment there, on alpha
let comment: any = await createComment(alpha, postRes.post_view.post.id);
let comment: any = await createComment(alpha, alphaPost1.post.id);
expect(comment['error']).toBe('locked');
// Unlock a post
let unlockedPost = await lockPost(alpha, false, postRes.post_view.post);
let unlockedPost = await lockPost(beta, false, betaPost1.post);
expect(unlockedPost.post_view.post.locked).toBe(false);
// Make sure that post is unlocked on beta
let searchBeta2 = await searchPost(beta, postRes.post_view.post);
let betaPost2 = searchBeta2.posts[0];
expect(betaPost2.community.local).toBe(true);
expect(betaPost2.creator.local).toBe(false);
expect(betaPost2.post.locked).toBe(false);
// Make sure that post is unlocked on alpha
let searchAlpha2 = await searchPostLocal(alpha, postRes.post_view.post);
let alphaPost2 = searchAlpha2.posts[0];
expect(alphaPost2.community.local).toBe(false);
expect(alphaPost2.creator.local).toBe(true);
expect(alphaPost2.post.locked).toBe(false);
// Try to create a new comment, on beta
let commentBeta = await createComment(beta, betaPost2.post.id);
expect(commentBeta).toBeDefined();
// Try to create a new comment, on alpha
let commentAlpha = await createComment(alpha, alphaPost1.post.id);
expect(commentAlpha).toBeDefined();
});
test('Delete a post', async () => {

View File

@ -15,7 +15,9 @@ use lemmy_apub::{
generate_inbox_url,
generate_shared_inbox_url,
ActorType,
CommunityType,
EndpointType,
UserType,
};
use lemmy_db_queries::{
diesel_option_overwrite_to_url,
@ -34,7 +36,7 @@ use lemmy_db_queries::{
};
use lemmy_db_schema::{
naive_now,
source::{comment::Comment, community::*, moderator::*, post::Post, site::*},
source::{comment::Comment, community::*, moderator::*, person::Person, post::Post, site::*},
PersonId,
};
use lemmy_db_views::comment_view::CommentQueryBuilder;
@ -707,16 +709,16 @@ impl Perform for AddModToCommunity {
let data: &AddModToCommunity = &self;
let local_user_view = get_local_user_view_from_jwt(&data.auth, context.pool()).await?;
let community_moderator_form = CommunityModeratorForm {
community_id: data.community_id,
person_id: data.person_id,
};
let community_id = data.community_id;
// Verify that only mods or admins can add mod
is_mod_or_admin(context.pool(), local_user_view.person.id, community_id).await?;
// Update in local database
let community_moderator_form = CommunityModeratorForm {
community_id: data.community_id,
person_id: data.person_id,
};
if data.added {
let join = move |conn: &'_ _| CommunityModerator::join(conn, &community_moderator_form);
if blocking(context.pool(), join).await?.is_err() {
@ -741,6 +743,28 @@ impl Perform for AddModToCommunity {
})
.await??;
// Send to federated instances
let updated_mod_id = data.person_id;
let updated_mod = blocking(context.pool(), move |conn| {
Person::read(conn, updated_mod_id)
})
.await??;
let community = blocking(context.pool(), move |conn| {
Community::read(conn, community_id)
})
.await??;
if data.added {
community
.send_add_mod(&local_user_view.person, updated_mod, context)
.await?;
} else {
community
.send_remove_mod(&local_user_view.person, updated_mod, context)
.await?;
}
// Note: in case a remote mod is added, this returns the old moderators list, it will only get
// updated once we receive an activity from the community (like `Announce/Add/Moderator`)
let community_id = data.community_id;
let moderators = blocking(context.pool(), move |conn| {
CommunityModeratorView::for_community(conn, community_id)
@ -748,18 +772,18 @@ impl Perform for AddModToCommunity {
.await??;
let res = AddModToCommunityResponse { moderators };
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::AddModToCommunity,
response: res.clone(),
community_id,
websocket_id,
});
Ok(res)
}
}
// TODO: we dont do anything for federation here, it should be updated the next time the community
// gets fetched. i hope we can get rid of the community creator role soon.
#[async_trait::async_trait(?Send)]
impl Perform for TransferCommunity {
type Response = GetCommunityResponse;

View File

@ -17,7 +17,7 @@ lemmy_db_views_actor = { path = "../db_views_actor" }
lemmy_api_structs = { path = "../api_structs" }
lemmy_websocket = { path = "../websocket" }
diesel = "1.4.5"
activitystreams = "0.7.0-alpha.10"
activitystreams = "0.7.0-alpha.11"
activitystreams-ext = "0.1.0-alpha.2"
bcrypt = "0.9.0"
chrono = { version = "0.4.19", features = ["serde"] }

View File

@ -1,2 +1,2 @@
pub(crate) mod receive;
pub(crate) mod send;
pub mod send;

View File

@ -1,6 +1,6 @@
use crate::{activities::receive::get_actor_as_person, objects::FromApub, ActorType, NoteExt};
use activitystreams::{
activity::{ActorAndObjectRefExt, Create, Dislike, Like, Remove, Update},
activity::{ActorAndObjectRefExt, Create, Dislike, Like, Update},
base::ExtendsExt,
};
use anyhow::Context;
@ -23,7 +23,8 @@ pub(crate) async fn receive_create_comment(
let note = NoteExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let comment = Comment::from_apub(&note, context, person.actor_id(), request_counter).await?;
let comment =
Comment::from_apub(&note, context, person.actor_id(), request_counter, false).await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
@ -73,7 +74,8 @@ pub(crate) async fn receive_update_comment(
.context(location_info!())?;
let person = get_actor_as_person(&update, context, request_counter).await?;
let comment = Comment::from_apub(&note, context, person.actor_id(), request_counter).await?;
let comment =
Comment::from_apub(&note, context, person.actor_id(), request_counter, false).await?;
let comment_id = comment.id;
let post_id = comment.post_id;
@ -228,7 +230,6 @@ pub(crate) async fn receive_delete_comment(
pub(crate) async fn receive_remove_comment(
context: &LemmyContext,
_remove: Remove,
comment: Comment,
) -> Result<(), LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| {

View File

@ -1,16 +1,9 @@
use crate::{activities::receive::verify_activity_domains_valid, inbox::is_addressed_to_public};
use activitystreams::{
activity::{ActorAndObjectRefExt, Delete, Remove, Undo},
base::{AnyBase, ExtendsExt},
};
use anyhow::Context;
use lemmy_api_structs::{blocking, community::CommunityResponse};
use lemmy_db_queries::{source::community::Community_, ApubObject};
use lemmy_db_queries::source::community::Community_;
use lemmy_db_schema::source::community::Community;
use lemmy_db_views_actor::community_view::CommunityView;
use lemmy_utils::{location_info, LemmyError};
use lemmy_utils::LemmyError;
use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
use url::Url;
pub(crate) async fn receive_delete_community(
context: &LemmyContext,
@ -42,23 +35,8 @@ pub(crate) async fn receive_delete_community(
pub(crate) async fn receive_remove_community(
context: &LemmyContext,
activity: AnyBase,
expected_domain: &Url,
community: Community,
) -> Result<(), LemmyError> {
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&remove, expected_domain, true)?;
is_addressed_to_public(&remove)?;
let community_uri = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &community_uri.into())
})
.await??;
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, true)
})
@ -85,16 +63,8 @@ pub(crate) async fn receive_remove_community(
pub(crate) async fn receive_undo_delete_community(
context: &LemmyContext,
undo: Undo,
community: Community,
expected_domain: &Url,
) -> Result<(), LemmyError> {
is_addressed_to_public(&undo)?;
let inner = undo.object().to_owned().one().context(location_info!())?;
let delete = Delete::from_any_base(inner)?.context(location_info!())?;
verify_activity_domains_valid(&delete, expected_domain, true)?;
is_addressed_to_public(&delete)?;
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, false)
})
@ -121,26 +91,8 @@ pub(crate) async fn receive_undo_delete_community(
pub(crate) async fn receive_undo_remove_community(
context: &LemmyContext,
undo: Undo,
expected_domain: &Url,
community: Community,
) -> Result<(), LemmyError> {
is_addressed_to_public(&undo)?;
let inner = undo.object().to_owned().one().context(location_info!())?;
let remove = Remove::from_any_base(inner)?.context(location_info!())?;
verify_activity_domains_valid(&remove, &expected_domain, true)?;
is_addressed_to_public(&remove)?;
let community_uri = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &community_uri.into())
})
.await??;
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, false)
})

View File

@ -1,12 +1,24 @@
use crate::{activities::receive::get_actor_as_person, objects::FromApub, ActorType, PageExt};
use crate::{
activities::receive::get_actor_as_person,
inbox::receive_for_community::verify_mod_activity,
objects::FromApub,
ActorType,
PageExt,
};
use activitystreams::{
activity::{Create, Dislike, Like, Remove, Update},
activity::{Announce, Create, Dislike, Like, Update},
prelude::*,
};
use anyhow::Context;
use lemmy_api_structs::{blocking, post::PostResponse};
use lemmy_db_queries::{source::post::Post_, Likeable};
use lemmy_db_schema::source::post::{Post, PostLike, PostLikeForm};
use lemmy_db_queries::{source::post::Post_, ApubObject, Crud, Likeable};
use lemmy_db_schema::{
source::{
community::Community,
post::{Post, PostLike, PostLikeForm},
},
DbUrl,
};
use lemmy_db_views::post_view::PostView;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
@ -20,7 +32,7 @@ pub(crate) async fn receive_create_post(
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = Post::from_apub(&page, context, person.actor_id(), request_counter).await?;
let post = Post::from_apub(&page, context, person.actor_id(), request_counter, false).await?;
// Refetch the view
let post_id = post.id;
@ -42,6 +54,7 @@ pub(crate) async fn receive_create_post(
pub(crate) async fn receive_update_post(
update: Update,
announce: Option<Announce>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
@ -49,7 +62,40 @@ pub(crate) async fn receive_update_post(
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = Post::from_apub(&page, context, person.actor_id(), request_counter).await?;
let post_id: DbUrl = page
.id_unchecked()
.context(location_info!())?
.to_owned()
.into();
let old_post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &post_id)
})
.await??;
// If sticked or locked state was changed, make sure the actor is a mod
let stickied = page.ext_one.stickied.context(location_info!())?;
let locked = !page.ext_one.comments_enabled.context(location_info!())?;
let mut mod_action_allowed = false;
if (stickied != old_post.stickied) || (locked != old_post.locked) {
dessalines marked this conversation as resolved Outdated

I'm sure this is fine, but the lack of parenthesis scares me that it might be evaluated in a strange order.

I'm sure this is fine, but the lack of parenthesis scares me that it might be evaluated in a strange order.

How would the order change anything? But I can add parenthesis if it makes you happy.

How would the order change anything? But I can add parenthesis if it makes you happy.

if (((stickied != old_post.stickied) || locked) != old_post.locked)

is different from

if ((stickied != old_post.stickied) || (locked != old_post.locked))

Without parenthesis order of operations might do the first.

`if (((stickied != old_post.stickied) || locked) != old_post.locked)` is different from `if ((stickied != old_post.stickied) || (locked != old_post.locked))` Without parenthesis order of operations might do the first.

I dont think Rust would do that, but couldnt find any documentation on the behaviour. So I'm adding the parenthesis just in case.

I dont think Rust would do that, but couldnt find any documentation on the behaviour. So I'm adding the parenthesis just in case.
let community = blocking(context.pool(), move |conn| {
Community::read(conn, old_post.community_id)
})
.await??;
// Only check mod status if the community is local, otherwise we trust that it was sent correctly.
if community.local {
verify_mod_activity(&update, announce, &community, context).await?;
}
mod_action_allowed = true;
}
let post = Post::from_apub(
&page,
context,
person.actor_id(),
request_counter,
mod_action_allowed,
)
.await?;
let post_id = post.id;
// Refetch the view
@ -173,7 +219,6 @@ pub(crate) async fn receive_delete_post(
pub(crate) async fn receive_remove_post(
context: &LemmyContext,
_remove: Remove,
post: Post,
) -> Result<(), LemmyError> {
let removed_post = blocking(context.pool(), move |conn| {

View File

@ -39,7 +39,7 @@ pub(crate) async fn receive_create_private_message(
.context(location_info!())?;
let private_message =
PrivateMessage::from_apub(&note, context, expected_domain, request_counter).await?;
PrivateMessage::from_apub(&note, context, expected_domain, request_counter, false).await?;
let message = blocking(&context.pool(), move |conn| {
PrivateMessageView::read(conn, private_message.id)
@ -85,7 +85,7 @@ pub(crate) async fn receive_update_private_message(
let note = NoteExt::from_any_base(object)?.context(location_info!())?;
let private_message =
PrivateMessage::from_apub(&note, context, expected_domain, request_counter).await?;
PrivateMessage::from_apub(&note, context, expected_domain, request_counter, false).await?;
let private_message_id = private_message.id;
let message = blocking(&context.pool(), move |conn| {

View File

@ -1,20 +1,24 @@
use crate::{
activities::send::generate_activity_id,
activity_queue::{send_activity_single_dest, send_to_community_followers},
activity_queue::{send_activity_single_dest, send_to_community, send_to_community_followers},
check_is_apub_id_valid,
extensions::context::lemmy_context,
fetcher::person::get_or_fetch_and_upsert_person,
generate_moderators_url,
insert_activity,
ActorType,
CommunityType,
};
use activitystreams::{
activity::{
kind::{AcceptType, AnnounceType, DeleteType, LikeType, RemoveType, UndoType},
kind::{AcceptType, AddType, AnnounceType, DeleteType, LikeType, RemoveType, UndoType},
Accept,
ActorAndObjectRefExt,
Add,
Announce,
Delete,
Follow,
OptTargetRefExt,
Remove,
Undo,
},
@ -26,7 +30,7 @@ use anyhow::Context;
use itertools::Itertools;
use lemmy_api_structs::blocking;
use lemmy_db_queries::DbPool;
use lemmy_db_schema::source::community::Community;
use lemmy_db_schema::source::{community::Community, person::Person};
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
@ -54,23 +58,10 @@ impl ActorType for Community {
.unwrap_or_else(|| self.inbox_url.to_owned())
.into()
}
}
async fn send_follow(
&self,
_follow_actor_id: &Url,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_unfollow(
&self,
_follow_actor_id: &Url,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()
}
#[async_trait::async_trait(?Send)]
impl CommunityType for Community {
/// As a local community, accept the follow request from a remote person.
async fn send_accept_follow(
&self,
@ -211,4 +202,46 @@ impl ActorType for Community {
Ok(inboxes)
}
async fn send_add_mod(
&self,
actor: &Person,
added_mod: Person,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let mut add = Add::new(
actor.actor_id.clone().into_inner(),
added_mod.actor_id.into_inner(),
);
add
.set_many_contexts(lemmy_context()?)
.set_id(generate_activity_id(AddType::Add)?)
.set_to(public())
.set_many_ccs(vec![self.actor_id()])
.set_target(generate_moderators_url(&self.actor_id)?.into_inner());
send_to_community(add, actor, self, context).await?;
Ok(())
}
async fn send_remove_mod(
&self,
actor: &Person,
removed_mod: Person,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let mut remove = Remove::new(
actor.actor_id.clone().into_inner(),
removed_mod.actor_id.into_inner(),
);
remove
.set_many_contexts(lemmy_context()?)
.set_id(generate_activity_id(RemoveType::Remove)?)
.set_to(public())
.set_many_ccs(vec![self.actor_id()])
.set_target(generate_moderators_url(&self.actor_id)?.into_inner());
send_to_community(remove, &actor, self, context).await?;
Ok(())
}
}

View File

@ -3,6 +3,7 @@ use crate::{
activity_queue::send_activity_single_dest,
extensions::context::lemmy_context,
ActorType,
UserType,
};
use activitystreams::{
activity::{
@ -10,11 +11,11 @@ use activitystreams::{
Follow,
Undo,
},
base::{AnyBase, BaseExt, ExtendsExt},
base::{BaseExt, ExtendsExt},
object::ObjectExt,
};
use lemmy_api_structs::blocking;
use lemmy_db_queries::{ApubObject, DbPool, Followable};
use lemmy_db_queries::{ApubObject, Followable};
use lemmy_db_schema::source::{
community::{Community, CommunityFollower, CommunityFollowerForm},
person::Person,
@ -47,7 +48,10 @@ impl ActorType for Person {
.unwrap_or_else(|| self.inbox_url.to_owned())
.into()
}
}
#[async_trait::async_trait(?Send)]
impl UserType for Person {
/// As a given local person, send out a follow request to a remote community.
async fn send_follow(
&self,
@ -110,40 +114,4 @@ impl ActorType for Person {
send_activity_single_dest(undo, self, community.inbox_url.into(), context).await?;
Ok(())
}
async fn send_accept_follow(
&self,
_follow: Follow,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_delete(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_undo_delete(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_remove(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_undo_remove(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
unimplemented!()
}
async fn send_announce(
&self,
_activity: AnyBase,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()
}
async fn get_follower_inboxes(&self, _pool: &DbPool) -> Result<Vec<Url>, LemmyError> {
unimplemented!()
}
}

View File

@ -3,6 +3,7 @@ use crate::{
extensions::signatures::sign_and_send,
insert_activity,
ActorType,
CommunityType,
APUB_JSON_CONTENT_TYPE,
};
use activitystreams::{

View File

@ -11,7 +11,8 @@ pub(crate) fn lemmy_context() -> Result<Vec<AnyBase>, LemmyError> {
"comments_enabled": {
"kind": "sc:Boolean",
"id": "pt:commentsEnabled"
}
},
"moderators": "as:moderators"
}))?;
Ok(vec![AnyBase::from(context()), context_ext])
}

View File

@ -2,6 +2,7 @@ use activitystreams::unparsed::UnparsedMutExt;
use activitystreams_ext::UnparsedExtension;
use lemmy_utils::LemmyError;
use serde::{Deserialize, Serialize};
use url::Url;
/// Activitystreams extension to allow (de)serializing additional Community field
/// `sensitive` (called 'nsfw' in Lemmy).
@ -9,12 +10,14 @@ use serde::{Deserialize, Serialize};
#[serde(rename_all = "camelCase")]
pub struct GroupExtension {
pub sensitive: Option<bool>,
pub moderators: Option<Url>,
}
impl GroupExtension {
pub fn new(sensitive: bool) -> Result<GroupExtension, LemmyError> {
pub fn new(sensitive: bool, moderators_url: Url) -> Result<GroupExtension, LemmyError> {
Ok(GroupExtension {
sensitive: Some(sensitive),
moderators: Some(moderators_url),
})
}
}
@ -28,11 +31,13 @@ where
fn try_from_unparsed(unparsed_mut: &mut U) -> Result<Self, Self::Error> {
Ok(GroupExtension {
sensitive: unparsed_mut.remove("sensitive")?,
moderators: unparsed_mut.remove("moderators")?,
})
}
fn try_into_unparsed(self, unparsed_mut: &mut U) -> Result<(), Self::Error> {
unparsed_mut.insert("sensitive", self.sensitive)?;
unparsed_mut.insert("moderators", self.moderators)?;
Ok(())
}
}

View File

@ -1,8 +1,8 @@
use crate::{
fetcher::{
fetch::fetch_remote_object,
get_or_fetch_and_upsert_person,
is_deleted,
person::get_or_fetch_and_upsert_person,
should_refetch_actor,
},
inbox::person_inbox::receive_announce,
@ -12,13 +12,16 @@ use crate::{
use activitystreams::{
actor::ApActorExt,
collection::{CollectionExt, OrderedCollection},
object::ObjectExt,
};
use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_api_structs::blocking;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
use lemmy_db_schema::{
source::community::{Community, CommunityModerator, CommunityModeratorForm},
DbUrl,
};
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
use log::debug;
@ -60,9 +63,9 @@ async fn fetch_remote_community(
apub_id: &Url,
context: &LemmyContext,
old_community: Option<Community>,
recursion_counter: &mut i32,
request_counter: &mut i32,
) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await;
if let Some(c) = old_community.to_owned() {
if is_deleted(&group) {
@ -78,51 +81,68 @@ async fn fetch_remote_community(
let group = group?;
let community =
Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
Community::from_apub(&group, context, apub_id.to_owned(), request_counter, false).await?;
// Also add the community moderators too
let attributed_to = group.inner.attributed_to().context(location_info!())?;
let creator_and_moderator_uris: Vec<&Url> = attributed_to
.as_many()
.context(location_info!())?
.iter()
.map(|a| a.as_xsd_any_uri().context(""))
.collect::<Result<Vec<&Url>, anyhow::Error>>()?;
let mut creator_and_moderators = Vec::new();
for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_person(uri, context, recursion_counter).await?;
creator_and_moderators.push(c_or_m);
}
// TODO: need to make this work to update mods of existing communities
if old_community.is_none() {
let community_id = community.id;
blocking(context.pool(), move |conn| {
for mod_ in creator_and_moderators {
let community_moderator_form = CommunityModeratorForm {
community_id,
person_id: mod_.id,
};
CommunityModerator::join(conn, &community_moderator_form)?;
}
Ok(()) as Result<(), LemmyError>
})
.await??;
}
update_community_mods(&group, &community, context, request_counter).await?;
// only fetch outbox for new communities, otherwise this can create an infinite loop
if old_community.is_none() {
let outbox = group.inner.outbox()?.context(location_info!())?;
fetch_community_outbox(context, outbox, &community, recursion_counter).await?
fetch_community_outbox(context, outbox, &community, request_counter).await?
}
Ok(community)
}
async fn update_community_mods(
group: &GroupExt,
community: &Community,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let new_moderators = fetch_community_mods(context, group, request_counter).await?;
let community_id = community.id;
let current_moderators = blocking(context.pool(), move |conn| {
CommunityModeratorView::for_community(&conn, community_id)
})
.await??;
// Remove old mods from database which arent in the moderators collection anymore
for mod_user in &current_moderators {
if !new_moderators.contains(&&mod_user.moderator.actor_id.clone().into()) {
let community_moderator_form = CommunityModeratorForm {
community_id: mod_user.community.id,
dessalines marked this conversation as resolved Outdated

I see this is fetched every time from_apub is run on community. This doesn't seem right, it should be a smart upsert like the others.

I see this is fetched every time `from_apub` is run on community. This doesn't seem right, it should be a smart upsert like the others.

Any idea how? I dont see how an upsert works here, because we cant simply overwrite a single database row, but need to insert or delete rows.

Any idea how? I dont see how an upsert works here, because we cant simply overwrite a single database row, but need to insert or delete rows.

Check out get_or_fetch_upsert_community, it looks like its already doing the initial moderators fetching.

It should be keyed just like the should_refetch_actor(c.last_refreshed_at), after the initial community fetch. In fact I don't even know that that's necessary, since the only reason for fetching is to:

  1. Get initial data
  2. When we don't receive atomic updates (the last_refresh_at is only necessary bc community and user edits aren't pushed). New community moderators are pushed, so I don't know that this is necessary.
Check out `get_or_fetch_upsert_community`, it looks like its already doing the initial moderators fetching. It should be keyed just like the `should_refetch_actor(c.last_refreshed_at)`, after the initial community fetch. In fact I don't even know that that's necessary, since the only reason for fetching is to: 1. Get initial data 2. When we don't receive atomic updates (the last_refresh_at is only necessary bc community and user edits aren't pushed). New community moderators are pushed, so I don't know that this is necessary.

Ah that makes sense, I'm moving the update logic from objects/community.rs to fetcher/community.rs. But we still need to run this every time, because its possible that our instance doesnt follow the community, and doesnt receive any activities (we could possibly add a check for this later).

And it is still necessary to fetch the community creator in FromApubToForm, until that field is removed.

Ah that makes sense, I'm moving the update logic from objects/community.rs to fetcher/community.rs. But we still need to run this every time, because its possible that our instance doesnt follow the community, and doesnt receive any activities (we could possibly add a check for this later). And it is still necessary to fetch the community creator in FromApubToForm, until that field is removed.
person_id: mod_user.moderator.id,
};
blocking(context.pool(), move |conn| {
CommunityModerator::leave(conn, &community_moderator_form)
})
.await??;
}
}
// Add new mods to database which have been added to moderators collection
for mod_uri in new_moderators {
let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
let current_mod_uris: Vec<DbUrl> = current_moderators
.clone()
.iter()
.map(|c| c.moderator.actor_id.clone())
.collect();
if !current_mod_uris.contains(&mod_user.actor_id) {
let community_moderator_form = CommunityModeratorForm {
community_id: community.id,
person_id: mod_user.id,
};
blocking(context.pool(), move |conn| {
CommunityModerator::join(conn, &community_moderator_form)
})
.await??;
}
}
Ok(())
}
async fn fetch_community_outbox(
context: &LemmyContext,
outbox: &Url,
@ -143,3 +163,27 @@ async fn fetch_community_outbox(
Ok(())
}
pub(crate) async fn fetch_community_mods(
context: &LemmyContext,
group: &GroupExt,
recursion_counter: &mut i32,
) -> Result<Vec<Url>, LemmyError> {
if let Some(mods_url) = &group.ext_one.moderators {
let mods =
fetch_remote_object::<OrderedCollection>(context.client(), mods_url, recursion_counter)
.await?;
let mods = mods
.items()
.map(|i| i.as_many())
.flatten()
.context(location_info!())?
.iter()
.filter_map(|i| i.as_xsd_any_uri())
.map(|u| u.to_owned())
.collect();
Ok(mods)
} else {
Ok(vec![])
}
}

View File

@ -30,7 +30,14 @@ pub(crate) async fn get_or_fetch_and_insert_post(
debug!("Fetching and creating remote post: {}", post_ap_id);
let page =
fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
let post = Post::from_apub(
&page,
context,
post_ap_id.to_owned(),
recursion_counter,
false,
)
.await?;
Ok(post)
}
@ -67,6 +74,7 @@ pub(crate) async fn get_or_fetch_and_insert_comment(
context,
comment_ap_id.to_owned(),
recursion_counter,
false,
)
.await?;

View File

@ -46,8 +46,14 @@ pub(crate) async fn get_or_fetch_and_upsert_person(
return Ok(u);
}
let person =
Person::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
let person = Person::from_apub(
&person?,
context,
apub_id.to_owned(),
recursion_counter,
false,
)
.await?;
let person_id = person.id;
blocking(context.pool(), move |conn| {
@ -63,8 +69,14 @@ pub(crate) async fn get_or_fetch_and_upsert_person(
let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
let person =
Person::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
let person = Person::from_apub(
&person,
context,
apub_id.to_owned(),
recursion_counter,
false,
)
.await?;
Ok(person)
}

View File

@ -147,13 +147,13 @@ async fn build_response(
];
}
SearchAcceptedObjects::Page(p) => {
let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
let p = Post::from_apub(&p, context, query_url, recursion_counter, false).await?;
response.posts =
vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
}
SearchAcceptedObjects::Comment(c) => {
let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
let c = Comment::from_apub(&c, context, query_url, recursion_counter, false).await?;
response.comments = vec![
blocking(context.pool(), move |conn| {

View File

@ -12,12 +12,12 @@ use lemmy_websocket::LemmyContext;
use serde::Deserialize;
#[derive(Deserialize)]
pub struct CommentQuery {
pub(crate) struct CommentQuery {
comment_id: String,
}
/// Return the ActivityPub json representation of a local comment over HTTP.
pub async fn get_apub_comment(
pub(crate) async fn get_apub_comment(
info: Path<CommentQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {

View File

@ -1,5 +1,6 @@
use crate::{
extensions::context::lemmy_context,
generate_moderators_url,
http::{create_apub_response, create_apub_tombstone_response},
objects::ToApub,
ActorType,
@ -7,23 +8,27 @@ use crate::{
use activitystreams::{
base::{AnyBase, BaseExt},
collection::{CollectionExt, OrderedCollection, UnorderedCollection},
url::Url,
};
use actix_web::{body::Body, web, HttpResponse};
use lemmy_api_structs::blocking;
use lemmy_db_queries::source::{activity::Activity_, community::Community_};
use lemmy_db_schema::source::{activity::Activity, community::Community};
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
use lemmy_db_views_actor::{
community_follower_view::CommunityFollowerView,
community_moderator_view::CommunityModeratorView,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
#[derive(Deserialize)]
pub struct CommunityQuery {
pub(crate) struct CommunityQuery {
community_name: String,
}
/// Return the ActivityPub json representation of a local community over HTTP.
pub async fn get_apub_community_http(
pub(crate) async fn get_apub_community_http(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -42,7 +47,7 @@ pub async fn get_apub_community_http(
}
/// Returns an empty followers collection, only populating the size (for privacy).
pub async fn get_apub_community_followers(
pub(crate) async fn get_apub_community_followers(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -67,7 +72,7 @@ pub async fn get_apub_community_followers(
/// Returns the community outbox, which is populated by a maximum of 20 posts (but no other
/// activites like votes or comments).
pub async fn get_apub_community_outbox(
pub(crate) async fn get_apub_community_outbox(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -96,7 +101,7 @@ pub async fn get_apub_community_outbox(
Ok(create_apub_response(&collection))
}
pub async fn get_apub_community_inbox(
pub(crate) async fn get_apub_community_inbox(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -107,7 +112,39 @@ pub async fn get_apub_community_inbox(
let mut collection = OrderedCollection::new();
collection
.set_id(format!("{}/inbox", community.actor_id).parse()?)
.set_id(community.inbox_url.into())
.set_many_contexts(lemmy_context()?);
Ok(create_apub_response(&collection))
}
pub(crate) async fn get_apub_community_moderators(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
let community = blocking(context.pool(), move |conn| {
Community::read_from_name(&conn, &info.community_name)
})
.await??;
// The attributed to, is an ordered vector with the creator actor_ids first,
// then the rest of the moderators
// TODO Technically the instance admins can mod the community, but lets
// ignore that for now
let cid = community.id;
let moderators = blocking(context.pool(), move |conn| {
CommunityModeratorView::for_community(&conn, cid)
})
.await??;
let moderators: Vec<Url> = moderators
.into_iter()
.map(|m| m.moderator.actor_id.into_inner())
.collect();
let mut collection = OrderedCollection::new();
collection
.set_id(generate_moderators_url(&community.actor_id)?.into())
.set_total_items(moderators.len() as u64)
.set_many_items(moderators)
.set_many_contexts(lemmy_context()?);
Ok(create_apub_response(&collection))
}

View File

@ -42,7 +42,7 @@ pub struct CommunityQuery {
}
/// Return the ActivityPub json representation of a local community over HTTP.
pub async fn get_activity(
pub(crate) async fn get_activity(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {

View File

@ -23,7 +23,7 @@ pub struct PersonQuery {
}
/// Return the ActivityPub json representation of a local person over HTTP.
pub async fn get_apub_person_http(
pub(crate) async fn get_apub_person_http(
info: web::Path<PersonQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -43,7 +43,7 @@ pub async fn get_apub_person_http(
}
}
pub async fn get_apub_person_outbox(
pub(crate) async fn get_apub_person_outbox(
info: web::Path<PersonQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -61,7 +61,7 @@ pub async fn get_apub_person_outbox(
Ok(create_apub_response(&collection))
}
pub async fn get_apub_person_inbox(
pub(crate) async fn get_apub_person_inbox(
info: web::Path<PersonQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
@ -72,7 +72,7 @@ pub async fn get_apub_person_inbox(
let mut collection = OrderedCollection::new();
collection
.set_id(format!("{}/inbox", person.actor_id.into_inner()).parse()?)
.set_id(person.inbox_url.into())
.set_many_contexts(lemmy_context()?);
Ok(create_apub_response(&collection))
}

View File

@ -12,12 +12,12 @@ use lemmy_websocket::LemmyContext;
use serde::Deserialize;
#[derive(Deserialize)]
pub struct PostQuery {
pub(crate) struct PostQuery {
post_id: String,
}
/// Return the ActivityPub json representation of a local post over HTTP.
pub async fn get_apub_post(
pub(crate) async fn get_apub_post(
info: web::Path<PostQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {

View File

@ -6,18 +6,21 @@ use crate::{
get_activity_to_and_cc,
inbox_verify_http_signature,
is_activity_already_known,
is_addressed_to_public,
receive_for_community::{
receive_add_for_community,
receive_create_for_community,
receive_delete_for_community,
receive_dislike_for_community,
receive_like_for_community,
receive_remove_for_community,
receive_undo_for_community,
receive_update_for_community,
},
verify_is_addressed_to_public,
},
insert_activity,
ActorType,
CommunityType,
};
use activitystreams::{
activity::{kind::FollowType, ActorAndObject, Follow, Undo},
@ -54,7 +57,8 @@ pub enum CommunityValidTypes {
Like, // upvote post or comment
Dislike, // downvote post or comment
Delete, // post or comment deleted by creator
Remove, // post or comment removed by mod or admin
Remove, // post or comment removed by mod or admin, or mod removed from community
Add, // mod added to community
}
pub type CommunityAcceptedActivities = ActorAndObject<CommunityValidTypes>;
@ -130,49 +134,102 @@ pub(crate) async fn community_receive_message(
let activity_kind = activity.kind().context(location_info!())?;
let do_announce = match activity_kind {
CommunityValidTypes::Follow => {
handle_follow(any_base.clone(), person, &to_community, &context).await?;
Box::pin(handle_follow(
any_base.clone(),
person,
&to_community,
&context,
))
.await?;
false
}
CommunityValidTypes::Undo => {
handle_undo(
Box::pin(handle_undo(
context,
activity.clone(),
actor_url,
&to_community,
request_counter,
)
))
.await?
}
CommunityValidTypes::Create => {
receive_create_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
Box::pin(receive_create_for_community(
context,
any_base.clone(),
&actor_url,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Update => {
receive_update_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
Box::pin(receive_update_for_community(
context,
any_base.clone(),
None,
&actor_url,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Like => {
receive_like_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
Box::pin(receive_like_for_community(
context,
any_base.clone(),
&actor_url,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Dislike => {
receive_dislike_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
Box::pin(receive_dislike_for_community(
context,
any_base.clone(),
&actor_url,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Delete => {
receive_delete_for_community(context, any_base.clone(), &actor_url).await?;
Box::pin(receive_delete_for_community(
context,
any_base.clone(),
None,
&actor_url,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Add => {
Box::pin(receive_add_for_community(
context,
any_base.clone(),
None,
request_counter,
))
.await?;
true
}
CommunityValidTypes::Remove => {
// TODO: we dont support remote mods, so this is ignored for now
//receive_remove_for_community(context, any_base.clone(), &person_url).await?
false
Box::pin(receive_remove_for_community(
context,
any_base.clone(),
None,
request_counter,
))
.await?;
true
}
};
if do_announce {
// Check again that the activity is public, just to be sure
is_addressed_to_public(&activity)?;
verify_is_addressed_to_public(&activity)?;
to_community
.send_announce(activity.into_any_base()?, context)
.await?;
@ -224,7 +281,7 @@ async fn handle_undo(
handle_undo_follow(any_base, actor_url, to_community, &context).await?;
Ok(false)
} else {
receive_undo_for_community(context, any_base, &actor_url, request_counter).await?;
receive_undo_for_community(context, any_base, None, &actor_url, request_counter).await?;
Ok(true)
}
}

View File

@ -27,7 +27,7 @@ use url::Url;
pub mod community_inbox;
pub mod person_inbox;
mod receive_for_community;
pub(crate) mod receive_for_community;
pub mod shared_inbox;
pub(crate) fn get_activity_id<T, Kind>(activity: &T, creator_uri: &Url) -> Result<Url, LemmyError>
@ -58,7 +58,7 @@ pub(crate) async fn is_activity_already_known(
pub(crate) fn get_activity_to_and_cc<T, Kind>(activity: &T) -> Vec<Url>
where
T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
T: AsObject<Kind>,
{
let mut to_and_cc = vec![];
if let Some(to) = activity.to() {
@ -84,7 +84,7 @@ where
to_and_cc
}
pub(crate) fn is_addressed_to_public<T, Kind>(activity: &T) -> Result<(), LemmyError>
pub(crate) fn verify_is_addressed_to_public<T, Kind>(activity: &T) -> Result<(), LemmyError>
where
T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
{

View File

@ -26,8 +26,8 @@ use crate::{
is_activity_already_known,
is_addressed_to_community_followers,
is_addressed_to_local_person,
is_addressed_to_public,
receive_for_community::{
receive_add_for_community,
receive_create_for_community,
receive_delete_for_community,
receive_dislike_for_community,
@ -36,12 +36,13 @@ use crate::{
receive_undo_for_community,
receive_update_for_community,
},
verify_is_addressed_to_public,
},
insert_activity,
ActorType,
};
use activitystreams::{
activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Remove, Undo, Update},
base::AnyBase,
prelude::*,
};
@ -153,19 +154,39 @@ pub(crate) async fn person_receive_message(
.await?;
}
PersonValidTypes::Announce => {
receive_announce(&context, any_base, actor, request_counter).await?
Box::pin(receive_announce(&context, any_base, actor, request_counter)).await?
}
PersonValidTypes::Create => {
receive_create(&context, any_base, actor_url, request_counter).await?
Box::pin(receive_create(
&context,
any_base,
actor_url,
request_counter,
))
.await?
}
PersonValidTypes::Update => {
receive_update(&context, any_base, actor_url, request_counter).await?
Box::pin(receive_update(
&context,
any_base,
actor_url,
request_counter,
))
.await?
}
PersonValidTypes::Delete => {
receive_delete(context, any_base, &actor_url, request_counter).await?
Box::pin(receive_delete(
context,
any_base,
&actor_url,
request_counter,
))
.await?
}
PersonValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
PersonValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
PersonValidTypes::Undo => {
Box::pin(receive_undo(context, any_base, &actor_url, request_counter)).await?
}
PersonValidTypes::Remove => Box::pin(receive_remove(context, any_base, &actor_url)).await?,
};
// TODO: would be logical to move websocket notification code here
@ -252,6 +273,7 @@ enum AnnouncableActivities {
Delete,
Remove,
Undo,
Add,
}
/// Takes an announce and passes the inner activity to the appropriate handler.
@ -263,7 +285,7 @@ pub async fn receive_announce(
) -> Result<(), LemmyError> {
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
is_addressed_to_public(&announce)?;
verify_is_addressed_to_public(&announce)?;
let kind = announce
.object()
@ -287,7 +309,14 @@ pub async fn receive_announce(
receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
}
Some(Update) => {
receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
receive_update_for_community(
context,
inner_activity,
Some(announce),
&inner_id,
request_counter,
)
.await
}
Some(Like) => {
receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
@ -295,15 +324,38 @@ pub async fn receive_announce(
Some(Dislike) => {
receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
}
Some(Delete) => receive_delete_for_community(context, inner_activity, &inner_id).await,
Some(Remove) => receive_remove_for_community(context, inner_activity, &inner_id).await,
Some(Delete) => {
receive_delete_for_community(
context,
inner_activity,
Some(announce),
&inner_id,
request_counter,
)
.await
}
Some(Remove) => {
receive_remove_for_community(context, inner_activity, Some(announce), request_counter).await
}
Some(Undo) => {
receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
receive_undo_for_community(
context,
inner_activity,
Some(announce),
&inner_id,
request_counter,
)
.await
}
Some(Add) => {
receive_add_for_community(context, inner_activity, Some(announce), request_counter).await
}
_ => receive_unhandled_activity(inner_activity),
}
}
/// Receive either a new private message, or a new comment mention. We distinguish them by checking
/// whether the activity is public.
async fn receive_create(
context: &LemmyContext,
activity: AnyBase,
@ -312,13 +364,15 @@ async fn receive_create(
) -> Result<(), LemmyError> {
let create = Create::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&create, &expected_domain, true)?;
if is_addressed_to_public(&create).is_ok() {
if verify_is_addressed_to_public(&create).is_ok() {
receive_create_comment(create, context, request_counter).await
} else {
receive_create_private_message(&context, create, expected_domain, request_counter).await
}
}
/// Receive either an updated private message, or an updated comment mention. We distinguish
/// them by checking whether the activity is public.
async fn receive_update(
context: &LemmyContext,
activity: AnyBase,
@ -327,7 +381,7 @@ async fn receive_update(
) -> Result<(), LemmyError> {
let update = Update::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&update, &expected_domain, true)?;
if is_addressed_to_public(&update).is_ok() {
if verify_is_addressed_to_public(&update).is_ok() {
receive_update_comment(update, context, request_counter).await
} else {
receive_update_private_message(&context, update, expected_domain, request_counter).await
@ -356,13 +410,31 @@ async fn receive_delete(
}
}
async fn receive_remove(
context: &LemmyContext,
any_base: AnyBase,
expected_domain: &Url,
) -> Result<(), LemmyError> {
let remove = Remove::from_any_base(any_base.clone())?.context(location_info!())?;
verify_activity_domains_valid(&remove, expected_domain, true)?;
let object_uri = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &object_uri.into())
})
.await??;
receive_remove_community(&context, community).await
}
async fn receive_undo(
context: &LemmyContext,
any_base: AnyBase,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
use CommunityOrPrivateMessage::*;
let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
verify_activity_domains_valid(&undo, expected_domain, true)?;
@ -377,15 +449,28 @@ async fn receive_undo(
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
use CommunityOrPrivateMessage::*;
match find_community_or_private_message_by_id(context, object_uri).await? {
Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
Community(c) => receive_undo_delete_community(context, c).await,
PrivateMessage(p) => {
receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
.await
}
}
}
Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
Some("Remove") => {
let remove = Remove::from_any_base(inner_activity)?.context(location_info!())?;
let object_uri = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &object_uri.into())
})
.await??;
receive_undo_remove_community(context, community).await
}
_ => receive_unhandled_activity(undo),
}
}

View File

@ -31,21 +31,47 @@ use crate::{
receive_unhandled_activity,
verify_activity_domains_valid,
},
fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
fetcher::{
objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
person::get_or_fetch_and_upsert_person,
},
find_post_or_comment_by_id,
inbox::is_addressed_to_public,
generate_moderators_url,
inbox::verify_is_addressed_to_public,
CommunityType,
PostOrComment,
};
use activitystreams::{
activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
activity::{
ActorAndObjectRef,
Add,
Announce,
Create,
Delete,
Dislike,
Like,
OptTargetRef,
Remove,
Undo,
Update,
},
base::AnyBase,
object::AsObject,
prelude::*,
};
use anyhow::Context;
use anyhow::{anyhow, Context};
use diesel::result::Error::NotFound;
use lemmy_api_structs::blocking;
use lemmy_db_queries::Crud;
use lemmy_db_schema::source::site::Site;
use lemmy_db_queries::{source::community::CommunityModerator_, ApubObject, Crud, Joinable};
use lemmy_db_schema::{
source::{
community::{Community, CommunityModerator, CommunityModeratorForm},
person::Person,
site::Site,
},
DbUrl,
};
use lemmy_db_views_actor::community_view::CommunityView;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
use strum_macros::EnumString;
@ -69,7 +95,7 @@ pub(in crate::inbox) async fn receive_create_for_community(
) -> Result<(), LemmyError> {
let create = Create::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&create, &expected_domain, true)?;
is_addressed_to_public(&create)?;
verify_is_addressed_to_public(&create)?;
let kind = create
.object()
@ -86,19 +112,21 @@ pub(in crate::inbox) async fn receive_create_for_community(
pub(in crate::inbox) async fn receive_update_for_community(
context: &LemmyContext,
activity: AnyBase,
announce: Option<Announce>,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let update = Update::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&update, &expected_domain, true)?;
is_addressed_to_public(&update)?;
verify_activity_domains_valid(&update, &expected_domain, false)?;
verify_is_addressed_to_public(&update)?;
verify_modification_actor_instance(&update, &announce, context, request_counter).await?;
let kind = update
.object()
.as_single_kind_str()
.and_then(|s| s.parse().ok());
match kind {
Some(PageOrNote::Page) => receive_update_post(update, context, request_counter).await,
Some(PageOrNote::Page) => receive_update_post(update, announce, context, request_counter).await,
Some(PageOrNote::Note) => receive_update_comment(update, context, request_counter).await,
_ => receive_unhandled_activity(update),
}
@ -113,7 +141,7 @@ pub(in crate::inbox) async fn receive_like_for_community(
) -> Result<(), LemmyError> {
let like = Like::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&like, &expected_domain, false)?;
is_addressed_to_public(&like)?;
verify_is_addressed_to_public(&like)?;
let object_id = like
.object()
@ -144,7 +172,7 @@ pub(in crate::inbox) async fn receive_dislike_for_community(
let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&dislike, &expected_domain, false)?;
is_addressed_to_public(&dislike)?;
verify_is_addressed_to_public(&dislike)?;
let object_id = dislike
.object()
@ -164,11 +192,14 @@ pub(in crate::inbox) async fn receive_dislike_for_community(
pub(in crate::inbox) async fn receive_delete_for_community(
context: &LemmyContext,
activity: AnyBase,
announce: Option<Announce>,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&delete, &expected_domain, true)?;
is_addressed_to_public(&delete)?;
verify_is_addressed_to_public(&delete)?;
verify_modification_actor_instance(&delete, &announce, context, request_counter).await?;
let object = delete
.object()
@ -187,38 +218,48 @@ pub(in crate::inbox) async fn receive_delete_for_community(
/// A post or comment being removed by a mod/admin
pub(in crate::inbox) async fn receive_remove_for_community(
context: &LemmyContext,
activity: AnyBase,
expected_domain: &Url,
remove_any_base: AnyBase,
announce: Option<Announce>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&remove, &expected_domain, false)?;
is_addressed_to_public(&remove)?;
let remove = Remove::from_any_base(remove_any_base.to_owned())?.context(location_info!())?;
let community = extract_community_from_cc(&remove, context).await?;
let cc = remove
.cc()
.map(|c| c.as_many())
.flatten()
.context(location_info!())?;
let community_id = cc
.first()
.map(|c| c.as_xsd_any_uri())
.flatten()
.context(location_info!())?;
verify_mod_activity(&remove, announce, &community, context).await?;
verify_is_addressed_to_public(&remove)?;
let object = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
if remove.target().is_some() {
let remove_mod = remove
.object()
.as_single_xsd_any_uri()
.context(location_info!())?;
let remove_mod = get_or_fetch_and_upsert_person(&remove_mod, context, request_counter).await?;
let form = CommunityModeratorForm {
community_id: community.id,
person_id: remove_mod.id,
};
blocking(context.pool(), move |conn| {
CommunityModerator::leave(conn, &form)
})
.await??;
community.send_announce(remove_any_base, context).await?;
// TODO: send websocket notification about removed mod
Ok(())
}
// Remove a post or comment
else {
let object = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
// Ensure that remove activity comes from the same domain as the community
remove.id(community_id.domain().context(location_info!())?)?;
match find_post_or_comment_by_id(context, object).await {
Ok(PostOrComment::Post(p)) => receive_remove_post(context, remove, *p).await,
Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, remove, *c).await,
// if we dont have the object, no need to do anything
Err(_) => Ok(()),
match find_post_or_comment_by_id(context, object).await {
Ok(PostOrComment::Post(p)) => receive_remove_post(context, *p).await,
Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, *c).await,
// if we dont have the object, no need to do anything
Err(_) => Ok(()),
}
}
}
@ -234,12 +275,13 @@ enum UndoableActivities {
pub(in crate::inbox) async fn receive_undo_for_community(
context: &LemmyContext,
activity: AnyBase,
announce: Option<Announce>,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&undo, &expected_domain.to_owned(), true)?;
is_addressed_to_public(&undo)?;
verify_is_addressed_to_public(&undo)?;
use UndoableActivities::*;
match undo
@ -248,7 +290,9 @@ pub(in crate::inbox) async fn receive_undo_for_community(
.and_then(|s| s.parse().ok())
{
Some(Delete) => receive_undo_delete_for_community(context, undo, expected_domain).await,
Some(Remove) => receive_undo_remove_for_community(context, undo, expected_domain).await,
Some(Remove) => {
receive_undo_remove_for_community(context, undo, announce, expected_domain).await
}
Some(Like) => {
receive_undo_like_for_community(context, undo, expected_domain, request_counter).await
}
@ -268,7 +312,7 @@ pub(in crate::inbox) async fn receive_undo_delete_for_community(
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
verify_activity_domains_valid(&delete, &expected_domain, true)?;
is_addressed_to_public(&delete)?;
verify_is_addressed_to_public(&delete)?;
let object = delete
.object()
@ -287,12 +331,14 @@ pub(in crate::inbox) async fn receive_undo_delete_for_community(
pub(in crate::inbox) async fn receive_undo_remove_for_community(
context: &LemmyContext,
undo: Undo,
announce: Option<Announce>,
expected_domain: &Url,
) -> Result<(), LemmyError> {
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
verify_activity_domains_valid(&remove, &expected_domain, false)?;
is_addressed_to_public(&remove)?;
verify_is_addressed_to_public(&remove)?;
verify_undo_remove_actor_instance(&undo, &remove, &announce, context).await?;
let object = remove
.object()
@ -317,7 +363,7 @@ pub(in crate::inbox) async fn receive_undo_like_for_community(
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
verify_activity_domains_valid(&like, &expected_domain, false)?;
is_addressed_to_public(&like)?;
verify_is_addressed_to_public(&like)?;
let object_id = like
.object()
@ -333,6 +379,50 @@ pub(in crate::inbox) async fn receive_undo_like_for_community(
}
}
/// Add a new mod to the community (can only be done by an existing mod).
pub(in crate::inbox) async fn receive_add_for_community(
context: &LemmyContext,
add_any_base: AnyBase,
announce: Option<Announce>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let add = Add::from_any_base(add_any_base.to_owned())?.context(location_info!())?;
let community = extract_community_from_cc(&add, context).await?;
verify_mod_activity(&add, announce, &community, context).await?;
verify_is_addressed_to_public(&add)?;
verify_add_remove_moderator_target(&add, &community)?;
let new_mod = add
.object()
.as_single_xsd_any_uri()
.context(location_info!())?;
let new_mod = get_or_fetch_and_upsert_person(&new_mod, context, request_counter).await?;
// If we had to refetch the community while parsing the activity, then the new mod has already
// been added. Skip it here as it would result in a duplicate key error.
let new_mod_id = new_mod.id;
let moderated_communities = blocking(context.pool(), move |conn| {
CommunityModerator::get_person_moderated_communities(conn, new_mod_id)
})
.await??;
if !moderated_communities.contains(&community.id) {
let form = CommunityModeratorForm {
community_id: community.id,
person_id: new_mod.id,
};
blocking(context.pool(), move |conn| {
CommunityModerator::join(conn, &form)
})
.await??;
}
if community.local {
community.send_announce(add_any_base, context).await?;
}
// TODO: send websocket notification about added mod
Ok(())
}
/// A post or comment downvote being reverted
pub(in crate::inbox) async fn receive_undo_dislike_for_community(
context: &LemmyContext,
@ -343,7 +433,7 @@ pub(in crate::inbox) async fn receive_undo_dislike_for_community(
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
verify_activity_domains_valid(&dislike, &expected_domain, false)?;
is_addressed_to_public(&dislike)?;
verify_is_addressed_to_public(&dislike)?;
let object_id = dislike
.object()
@ -374,3 +464,170 @@ async fn fetch_post_or_comment_by_id(
Err(NotFound.into())
}
/// Searches the activity's cc field for a Community ID, and returns the community.
async fn extract_community_from_cc<T, Kind>(
activity: &T,
context: &LemmyContext,
) -> Result<Community, LemmyError>
where
T: AsObject<Kind>,
{
let cc = activity
.cc()
.map(|c| c.as_many())
.flatten()
.context(location_info!())?;
let community_id = cc
.first()
.map(|c| c.as_xsd_any_uri())
.flatten()
.context(location_info!())?;
let community_id: DbUrl = community_id.to_owned().into();
let community = blocking(&context.pool(), move |conn| {
Community::read_from_apub_id(&conn, &community_id)
})
.await??;
Ok(community)
}
/// Checks that a moderation activity was sent by a user who is listed as mod for the community.
/// This is only used in the case of remote mods, as local mod actions don't go through the
/// community inbox.
///
/// This method should only be used for activities received by the community, not for activities
/// used by community followers.
async fn verify_actor_is_community_mod<T, Kind>(
activity: &T,
community: &Community,
context: &LemmyContext,
) -> Result<(), LemmyError>
where
T: ActorAndObjectRef + BaseExt<Kind>,
{
let actor = activity
.actor()?
.as_single_xsd_any_uri()
.context(location_info!())?
.to_owned();
let actor = blocking(&context.pool(), move |conn| {
Person::read_from_apub_id(&conn, &actor.into())
})
.await??;
// Note: this will also return true for admins in addition to mods, but as we dont know about
// remote admins, it doesnt make any difference.
let community_id = community.id;
let actor_id = actor.id;
let is_mod_or_admin = blocking(context.pool(), move |conn| {
CommunityView::is_mod_or_admin(conn, actor_id, community_id)
})
.await?;
if !is_mod_or_admin {
return Err(anyhow!("Not a mod").into());
}
Ok(())
}
/// This method behaves differently, depending if it is called via community inbox (activity
/// received by community from a remote user), or via user inbox (activity received by user from
/// community). We distinguish the cases by checking if the activity is wrapper in an announce
/// (only true when sent from user to community).
///
/// In the first case, we check that the actor is listed as community mod. In the second case, we
/// only check that the announce comes from the same domain as the activity. We trust the
/// community's instance to have validated the inner activity correctly. We can't do this validation
/// here, because we don't know who the instance admins are. Plus this allows for compatibility with
/// software that uses different rules for mod actions.
pub(crate) async fn verify_mod_activity<T, Kind>(
mod_action: &T,
announce: Option<Announce>,
community: &Community,
context: &LemmyContext,
) -> Result<(), LemmyError>
where
T: ActorAndObjectRef + BaseExt<Kind>,
{
match announce {
None => verify_actor_is_community_mod(mod_action, community, context).await?,
Some(a) => verify_activity_domains_valid(&a, &community.actor_id.to_owned().into(), false)?,
}
dessalines marked this conversation as resolved Outdated

So if its not an announce, then you know its local. Hopefully that check is done somewhere up the chain.

So if its not an announce, then you know its local. Hopefully that check is done somewhere up the chain.

The Option<Announce> is passed into all methods which use this check.

The `Option<Announce>` is passed into all methods which use this check.
Ok(())
}
/// For Add/Remove community moderator activities, check that the target field actually contains
/// /c/community/moderators. Any different values are unsupported.
fn verify_add_remove_moderator_target<T, Kind>(
activity: &T,
community: &Community,
) -> Result<(), LemmyError>
where
T: ActorAndObjectRef + BaseExt<Kind> + OptTargetRef,
{
let target = activity
.target()
.map(|t| t.as_single_xsd_any_uri())
.flatten()
.context(location_info!())?;
if target != &generate_moderators_url(&community.actor_id)?.into_inner() {
return Err(anyhow!("Unkown target url").into());
}
Ok(())
}
/// For activities like Update, Delete or Remove, check that the actor is from the same instance
/// as the original object itself (or is a remote mod).
///
/// Note: This is only needed for mod actions. Normal user actions (edit post, undo vote etc) are
/// already verified with `expected_domain`, so this serves as an additional check.
async fn verify_modification_actor_instance<T, Kind>(
activity: &T,
announce: &Option<Announce>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError>
where
T: ActorAndObjectRef + BaseExt<Kind> + AsObject<Kind>,
{
let actor_id = activity
.actor()?
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
let object_id = activity
.object()
.as_one()
.map(|o| o.id())
.flatten()
.context(location_info!())?;
let original_id = match fetch_post_or_comment_by_id(object_id, context, request_counter).await? {
PostOrComment::Post(p) => p.ap_id.into_inner(),
PostOrComment::Comment(c) => c.ap_id.into_inner(),
};
if actor_id.domain() != original_id.domain() {
let community = extract_community_from_cc(activity, context).await?;
verify_mod_activity(activity, announce.to_owned(), &community, context).await?;
}
Ok(())
}
pub(crate) async fn verify_undo_remove_actor_instance<T, Kind>(
undo: &Undo,
inner: &T,
announce: &Option<Announce>,
context: &LemmyContext,
) -> Result<(), LemmyError>
where
T: ActorAndObjectRef + BaseExt<Kind> + AsObject<Kind>,
{
if announce.is_none() {
let community = extract_community_from_cc(undo, context).await?;
verify_mod_activity(undo, announce.to_owned(), &community, context).await?;
verify_mod_activity(inner, announce.to_owned(), &community, context).await?;
}
Ok(())
}

View File

@ -36,6 +36,7 @@ pub enum ValidTypes {
Undo,
Remove,
Announce,
Add,
}
// TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject,
@ -79,13 +80,13 @@ pub async fn shared_inbox(
let community_activity = CommunityAcceptedActivities::from_any_base(activity_any_base.clone())?
.context(location_info!())?;
res = Some(
community_receive_message(
Box::pin(community_receive_message(
community_activity,
community,
actor.as_ref(),
&context,
request_counter,
)
))
.await?,
);
} else if is_addressed_to_local_person(&to_and_cc, context.pool()).await? {
@ -93,13 +94,13 @@ pub async fn shared_inbox(
.context(location_info!())?;
// `to_person` is only used for follow activities (which we dont receive here), so no need to pass
// it in
person_receive_message(
Box::pin(person_receive_message(
person_activity,
None,
actor.as_ref(),
&context,
request_counter,
)
))
.await?;
} else if is_addressed_to_community_followers(&to_and_cc, context.pool())
.await?
@ -108,13 +109,13 @@ pub async fn shared_inbox(
let person_activity = PersonAcceptedActivities::from_any_base(activity_any_base.clone())?
.context(location_info!())?;
res = Some(
person_receive_message(
Box::pin(person_receive_message(
person_activity,
None,
actor.as_ref(),
&context,
request_counter,
)
))
.await?,
);
}

View File

@ -17,7 +17,7 @@ use crate::extensions::{
};
use activitystreams::{
activity::Follow,
actor::{ApActor, Group, Person},
actor,
base::AnyBase,
object::{ApObject, Note, Page},
};
@ -31,7 +31,7 @@ use lemmy_db_schema::{
activity::Activity,
comment::Comment,
community::Community,
person::Person as DbPerson,
person::{Person as DbPerson, Person},
post::Post,
private_message::PrivateMessage,
},
@ -44,9 +44,9 @@ use std::net::IpAddr;
use url::{ParseError, Url};
/// Activitystreams type for community
type GroupExt = Ext2<ApActor<ApObject<Group>>, GroupExtension, PublicKeyExtension>;
type GroupExt = Ext2<actor::ApActor<ApObject<actor::Group>>, GroupExtension, PublicKeyExtension>;
/// Activitystreams type for person
type PersonExt = Ext1<ApActor<ApObject<Person>>, PublicKeyExtension>;
type PersonExt = Ext1<actor::ApActor<ApObject<actor::Person>>, PublicKeyExtension>;
/// Activitystreams type for post
type PageExt = Ext1<ApObject<Page>, PageExtension>;
type NoteExt = ApObject<Note>;
@ -166,38 +166,6 @@ pub trait ActorType {
fn public_key(&self) -> Option<String>;
fn private_key(&self) -> Option<String>;
async fn send_follow(
&self,
follow_actor_id: &Url,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_unfollow(
&self,
follow_actor_id: &Url,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_accept_follow(
&self,
follow: Follow,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_delete(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_undo_delete(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_remove(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_undo_remove(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_announce(
&self,
activity: AnyBase,
context: &LemmyContext,
) -> Result<(), LemmyError>;
/// For a given community, returns the inboxes of all followers.
async fn get_follower_inboxes(&self, pool: &DbPool) -> Result<Vec<Url>, LemmyError>;
fn get_shared_inbox_or_inbox_url(&self) -> Url;
/// Outbox URL is not generally used by Lemmy, so it can be generated on the fly (but only for
@ -221,6 +189,55 @@ pub trait ActorType {
}
}
#[async_trait::async_trait(?Send)]
pub trait CommunityType {
async fn get_follower_inboxes(&self, pool: &DbPool) -> Result<Vec<Url>, LemmyError>;
async fn send_accept_follow(
&self,
follow: Follow,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_delete(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_undo_delete(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_remove(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_undo_remove(&self, context: &LemmyContext) -> Result<(), LemmyError>;
async fn send_announce(
&self,
activity: AnyBase,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_add_mod(
&self,
actor: &Person,
added_mod: Person,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_remove_mod(
&self,
actor: &Person,
removed_mod: Person,
context: &LemmyContext,
) -> Result<(), LemmyError>;
}
#[async_trait::async_trait(?Send)]
pub trait UserType {
async fn send_follow(
&self,
follow_actor_id: &Url,
context: &LemmyContext,
) -> Result<(), LemmyError>;
async fn send_unfollow(
&self,
follow_actor_id: &Url,
context: &LemmyContext,
) -> Result<(), LemmyError>;
}
pub enum EndpointType {
Community,
Person,
@ -276,6 +293,10 @@ pub fn generate_shared_inbox_url(actor_id: &DbUrl) -> Result<DbUrl, LemmyError>
Ok(Url::parse(&url)?.into())
}
pub(crate) fn generate_moderators_url(community_id: &DbUrl) -> Result<DbUrl, LemmyError> {
Ok(Url::parse(&format!("{}/moderators", community_id))?.into())
}
/// Store a sent or received activity in the database, for logging purposes. These records are not
/// persistent.
pub(crate) async fn insert_activity<T>(
@ -329,6 +350,7 @@ pub(crate) async fn find_post_or_comment_by_id(
Err(NotFound.into())
}
#[derive(Debug)]
pub(crate) enum Object {
Comment(Box<Comment>),
Post(Box<Post>),

View File

@ -26,6 +26,7 @@ use lemmy_db_queries::{Crud, DbPool};
use lemmy_db_schema::{
source::{
comment::{Comment, CommentForm},
community::Community,
person::Person,
post::Post,
},
@ -52,6 +53,9 @@ impl ToApub for Comment {
let post_id = self.post_id;
let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
let community_id = post.community_id;
let community = blocking(pool, move |conn| Community::read(conn, community_id)).await??;
// Add a vector containing some important info to the "in_reply_to" field
// [post_ap_id, Option(parent_comment_ap_id)]
let mut in_reply_to_vec = vec![post.ap_id.into_inner()];
@ -67,7 +71,8 @@ impl ToApub for Comment {
.set_many_contexts(lemmy_context()?)
.set_id(self.ap_id.to_owned().into_inner())
.set_published(convert_datetime(self.published))
.set_to(public())
// NOTE: included community id for compatibility with lemmy v0.9.9
.set_many_tos(vec![community.actor_id.into_inner(), public()])
dessalines marked this conversation as resolved
Review

I tried removing these, and setting to public, it didn't fix the test.

I tried removing these, and setting to public, it didn't fix the test.
Review

This cant be it, the test failure happened before I made this change.

This cant be it, the test failure happened before I made this change.
.set_many_in_reply_tos(in_reply_to_vec)
.set_attributed_to(creator.actor_id.into_inner());
@ -102,9 +107,16 @@ impl FromApub for Comment {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<Comment, LemmyError> {
let comment: Comment =
get_object_from_apub(note, context, expected_domain, request_counter).await?;
let comment: Comment = get_object_from_apub(
note,
context,
expected_domain,
request_counter,
mod_action_allowed,
dessalines marked this conversation as resolved
Review

I'm confused by this. One is is_mod_action and the other is mod_action_allowed. What does converting a comment from apub -> DB have to do with mod actions? IE the core types are separate from actions on them.

I'm confused by this. One is `is_mod_action` and the other is `mod_action_allowed`. What does converting a comment from apub -> DB have to do with mod actions? IE the core types are separate from actions on them.
Review

That is indeed confusing. The check in Post::FromApub is so that only mods can change the locked/sticky flag for a post, but not the post author. It would definitely be good if we could verify this in a simpler way.

That is indeed confusing. The check in `Post::FromApub` is so that only mods can change the locked/sticky flag for a post, but not the post author. It would definitely be good if we could verify this in a simpler way.
)
.await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
@ -131,6 +143,7 @@ impl FromApubToForm<NoteExt> for CommentForm {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
_mod_action_allowed: bool,
) -> Result<CommentForm, LemmyError> {
let creator_actor_id = &note
.attributed_to()
@ -152,15 +165,24 @@ impl FromApubToForm<NoteExt> for CommentForm {
let post_ap_id = in_reply_tos.next().context(location_info!())??;
// This post, or the parent comment might not yet exist on this server yet, fetch them.
let post = get_or_fetch_and_insert_post(&post_ap_id, context, request_counter).await?;
let post = Box::pin(get_or_fetch_and_insert_post(
&post_ap_id,
context,
request_counter,
))
.await?;
// The 2nd item, if it exists, is the parent comment apub_id
// For deeply nested comments, FromApub automatically gets called recursively
let parent_id: Option<CommentId> = match in_reply_tos.next() {
Some(parent_comment_uri) => {
let parent_comment_ap_id = &parent_comment_uri?;
let parent_comment =
get_or_fetch_and_insert_comment(&parent_comment_ap_id, context, request_counter).await?;
let parent_comment = Box::pin(get_or_fetch_and_insert_comment(
&parent_comment_ap_id,
context,
request_counter,
))
.await?;
Some(parent_comment.id)
}

View File

@ -1,6 +1,7 @@
use crate::{
extensions::{context::lemmy_context, group_extensions::GroupExtension},
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::{community::fetch_community_mods, person::get_or_fetch_and_upsert_person},
generate_moderators_url,
objects::{
check_object_domain,
create_tombstone,
@ -42,10 +43,6 @@ impl ToApub for Community {
type ApubType = GroupExt;
async fn to_apub(&self, pool: &DbPool) -> Result<GroupExt, LemmyError> {
// The attributed to, is an ordered vector with the creator actor_ids first,
// then the rest of the moderators
// TODO Technically the instance admins can mod the community, but lets
// ignore that for now
let id = self.id;
let moderators = blocking(pool, move |conn| {
CommunityModeratorView::for_community(&conn, id)
@ -62,6 +59,7 @@ impl ToApub for Community {
.set_id(self.actor_id.to_owned().into())
.set_name(self.title.to_owned())
.set_published(convert_datetime(self.published))
// NOTE: included attritubed_to field for compatibility with lemmy v0.9.9
.set_many_attributed_tos(moderators);
if let Some(u) = self.updated.to_owned() {
@ -95,7 +93,7 @@ impl ToApub for Community {
Ok(Ext2::new(
ap_actor,
GroupExtension::new(self.nsfw)?,
GroupExtension::new(self.nsfw, generate_moderators_url(&self.actor_id)?.into())?,
self.get_public_key_ext()?,
))
}
@ -114,14 +112,22 @@ impl ToApub for Community {
impl FromApub for Community {
type ApubType = GroupExt;
/// Converts a `Group` to `Community`.
/// Converts a `Group` to `Community`, inserts it into the database and updates moderators.
async fn from_apub(
group: &GroupExt,
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,

Hopefully this is an upsert / initial fetch, seems quite a waste to be fetching this every time.

It also seemed like the send_add_mod means that newly added mods get pushed.

Hopefully this is an upsert / initial fetch, seems quite a waste to be fetching this every time. It also seemed like the send_add_mod means that newly added mods get pushed.

There is also send_remove_mod(), and code to receive that activity in receive_for_community.rs.

There is also send_remove_mod(), and code to receive that activity in receive_for_community.rs.
) -> Result<Community, LemmyError> {
get_object_from_apub(group, context, expected_domain, request_counter).await
get_object_from_apub(
group,
context,
expected_domain,
request_counter,
mod_action_allowed,
)
.await
}
}
@ -132,18 +138,27 @@ impl FromApubToForm<GroupExt> for CommunityForm {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
_mod_action_allowed: bool,
) -> Result<Self, LemmyError> {
let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?;
let creator_uri = creator_and_moderator_uris
.as_many()
.context(location_info!())?
.iter()
.next()
.context(location_info!())?
.as_xsd_any_uri()
.context(location_info!())?;
let moderator_uris = fetch_community_mods(context, group, request_counter).await?;
let creator = if let Some(creator_uri) = moderator_uris.first() {
get_or_fetch_and_upsert_person(creator_uri, context, request_counter)
} else {
// NOTE: code for compatibility with lemmy v0.9.9
let creator_uri = group
.inner
.attributed_to()
.map(|a| a.as_many())
.flatten()
.map(|a| a.first())
.flatten()
.map(|a| a.as_xsd_any_uri())
.flatten()
.context(location_info!())?;
get_or_fetch_and_upsert_person(creator_uri, context, request_counter)
}
.await?;
let creator = get_or_fetch_and_upsert_person(creator_uri, context, request_counter).await?;
let name = group
.inner
.preferred_username()

View File

@ -1,7 +1,8 @@
use crate::{
check_is_apub_id_valid,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
inbox::community_inbox::check_community_or_site_ban,
inbox::{community_inbox::check_community_or_site_ban, get_activity_to_and_cc},
PageExt,
};
use activitystreams::{
base::{AsBase, BaseExt, ExtendsExt},
@ -45,12 +46,14 @@ pub(crate) trait FromApub {
///
/// * `apub` The object to read from
/// * `context` LemmyContext which holds DB pool, HTTP client etc
/// * `expected_domain` Domain where the object was received from
/// * `expected_domain` Domain where the object was received from. None in case of mod action.
/// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case
async fn from_apub(
apub: &Self::ApubType,
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<Self, LemmyError>
where
Self: Sized;
@ -63,6 +66,7 @@ pub(in crate::objects) trait FromApubToForm<ApubType> {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<Self, LemmyError>
where
Self: Sized;
@ -132,19 +136,17 @@ where
{
let content = object
.content()
.map(|s| s.as_single_xsd_string())
.flatten()
.map(|s| s.to_string());
.map(|s| s.as_single_xsd_string().map(|s2| s2.to_string()))
.flatten();
if content.is_some() {
let source = object.source().context(location_info!())?;
let source = Object::<()>::from_any_base(source.to_owned())?.context(location_info!())?;
check_is_markdown(source.media_type())?;
let source_content = source
.content()
.map(|s| s.as_single_xsd_string())
.map(|s| s.as_single_xsd_string().map(|s2| s2.to_string()))
.flatten()
.context(location_info!())?
.to_string();
.context(location_info!())?;
return Ok(Some(source_content));
}
Ok(None)
@ -177,6 +179,7 @@ pub(in crate::objects) async fn get_object_from_apub<From, Kind, To, ToForm, IdT
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
is_mod_action: bool,
) -> Result<To, LemmyError>
where
From: BaseExt<Kind>,
@ -196,7 +199,14 @@ where
}
// otherwise parse and insert, assuring that it comes from the right domain
else {
let to_form = ToForm::from_apub(&from, context, expected_domain, request_counter).await?;
let to_form = ToForm::from_apub(
&from,
context,
expected_domain,
request_counter,
is_mod_action,
)
.await?;
let to = blocking(context.pool(), move |conn| To::upsert(conn, &to_form)).await??;
Ok(to)
@ -221,23 +231,12 @@ where
check_community_or_site_ban(&person, community_id, context.pool()).await
}
pub(in crate::objects) async fn get_to_community<T, Kind>(
object: &T,
pub(in crate::objects) async fn get_community_from_to_or_cc(
page: &PageExt,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Community, LemmyError>
where
T: ObjectExt<Kind>,
{
let community_ids = object
.to()
.context(location_info!())?
.as_many()
.context(location_info!())?
.iter()
.map(|a| a.as_xsd_any_uri().context(location_info!()))
.collect::<Result<Vec<&Url>, anyhow::Error>>()?;
for cid in community_ids {
) -> Result<Community, LemmyError> {
for cid in get_activity_to_and_cc(page) {
let community = get_or_fetch_and_upsert_community(&cid, context, request_counter).await;
if community.is_ok() {
return community;

View File

@ -93,6 +93,7 @@ impl FromApub for DbPerson {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<DbPerson, LemmyError> {
let person_id = person.id_unchecked().context(location_info!())?.to_owned();
let domain = person_id.domain().context(location_info!())?;
@ -103,8 +104,14 @@ impl FromApub for DbPerson {
.await??;
Ok(person)
} else {
let person_form =
PersonForm::from_apub(person, context, expected_domain, request_counter).await?;
let person_form = PersonForm::from_apub(
person,
context,
expected_domain,
request_counter,
mod_action_allowed,
)
.await?;
let person = blocking(context.pool(), move |conn| {
DbPerson::upsert(conn, &person_form)
})
@ -121,6 +128,7 @@ impl FromApubToForm<PersonExt> for PersonForm {
_context: &LemmyContext,
expected_domain: Url,
_request_counter: &mut i32,
_mod_action_allowed: bool,
) -> Result<Self, LemmyError> {
let avatar = match person.icon() {
Some(any_image) => Some(

View File

@ -1,13 +1,14 @@
use crate::{
check_is_apub_id_valid,
extensions::{context::lemmy_context, page_extension::PageExtension},
fetcher::person::get_or_fetch_and_upsert_person,
objects::{
check_object_domain,
check_object_for_community_or_site_ban,
create_tombstone,
get_community_from_to_or_cc,
get_object_from_apub,
get_source_markdown_value,
get_to_community,
set_content_and_source,
FromApub,
FromApubToForm,
@ -117,8 +118,16 @@ impl FromApub for Post {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<Post, LemmyError> {
let post: Post = get_object_from_apub(page, context, expected_domain, request_counter).await?;
let post: Post = get_object_from_apub(
page,
context,
expected_domain,
request_counter,
mod_action_allowed,
)
.await?;
check_object_for_community_or_site_ban(page, post.community_id, context, request_counter)
.await?;
Ok(post)
@ -132,7 +141,15 @@ impl FromApubToForm<PageExt> for PostForm {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<PostForm, LemmyError> {
let ap_id = if mod_action_allowed {
let id = page.id_unchecked().context(location_info!())?;
check_is_apub_id_valid(id)?;
id.to_owned().into()
} else {
check_object_domain(page, expected_domain)?
};
let ext = &page.ext_one;
let creator_actor_id = page
.inner
@ -145,7 +162,7 @@ impl FromApubToForm<PageExt> for PostForm {
let creator =
get_or_fetch_and_upsert_person(creator_actor_id, context, request_counter).await?;
let community = get_to_community(page, context, request_counter).await?;
let community = get_community_from_to_or_cc(page, context, request_counter).await?;
let thumbnail_url: Option<Url> = match &page.inner.image() {
Some(any_image) => Image::from_any_base(
@ -179,16 +196,20 @@ impl FromApubToForm<PageExt> for PostForm {
let name = page
.inner
.name()
.map(|s| s.map(|s2| s2.to_owned()))
// The following is for compatibility with lemmy v0.9.9 and older
// TODO: remove it after some time (along with the map above)
.or_else(|| page.inner.summary().map(|s| s.to_owned()))
.or_else(|| page.inner.summary())
.context(location_info!())?
.as_single_xsd_string()
.context(location_info!())?
.to_string();
let body = get_source_markdown_value(page)?;
// TODO: expected_domain is wrong in this case, because it simply takes the domain of the actor
// maybe we need to take id_unchecked() if the activity is from community to user?
// why did this work before? -> i dont think it did?
// -> try to make expected_domain optional and set it null if it is a mod action
check_slurs(&name)?;
let body_slurs_removed = body.map(|b| remove_slurs(&b));
Ok(PostForm {
@ -216,7 +237,7 @@ impl FromApubToForm<PageExt> for PostForm {
embed_description: iframely_description,
embed_html: iframely_html,
thumbnail_url: pictrs_thumbnail.map(|u| u.into()),
ap_id: Some(check_object_domain(page, expected_domain)?),
ap_id: Some(ap_id),
local: false,
})
}

View File

@ -77,8 +77,16 @@ impl FromApub for PrivateMessage {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
mod_action_allowed: bool,
) -> Result<PrivateMessage, LemmyError> {
get_object_from_apub(note, context, expected_domain, request_counter).await
get_object_from_apub(
note,
context,
expected_domain,
request_counter,
mod_action_allowed,
)
.await
}
}
@ -89,6 +97,7 @@ impl FromApubToForm<NoteExt> for PrivateMessageForm {
context: &LemmyContext,
expected_domain: Url,
request_counter: &mut i32,
_mod_action_allowed: bool,
) -> Result<PrivateMessageForm, LemmyError> {
let creator_actor_id = note
.attributed_to()

View File

@ -5,6 +5,7 @@ use crate::{
get_apub_community_followers,
get_apub_community_http,
get_apub_community_inbox,
get_apub_community_moderators,
get_apub_community_outbox,
},
get_activity,
@ -57,6 +58,10 @@ pub fn config(cfg: &mut web::ServiceConfig) {
"/c/{community_name}/inbox",
web::get().to(get_apub_community_inbox),
)
.route(
"/c/{community_name}/moderators",
web::get().to(get_apub_community_moderators),
)
.route("/u/{user_name}", web::get().to(get_apub_person_http))
.route(
"/u/{user_name}/outbox",

View File

@ -1,4 +1,5 @@
{
hostname: lemmy-alpha:8541
port: 8541
tls_enabled: false
jwt_secret: changeme