Federated Moderation #185
|
@ -76,7 +76,7 @@ pub(crate) async fn receive_update_post(
|
||||||
let stickied = page.ext_one.stickied.context(location_info!())?;
|
let stickied = page.ext_one.stickied.context(location_info!())?;
|
||||||
let locked = !page.ext_one.comments_enabled.context(location_info!())?;
|
let locked = !page.ext_one.comments_enabled.context(location_info!())?;
|
||||||
let mut mod_action_allowed = false;
|
let mut mod_action_allowed = false;
|
||||||
if stickied != old_post.stickied || locked != old_post.locked {
|
if (stickied != old_post.stickied) || (locked != old_post.locked) {
|
||||||
dessalines marked this conversation as resolved
Outdated
|
|||||||
let community = blocking(context.pool(), move |conn| {
|
let community = blocking(context.pool(), move |conn| {
|
||||||
Community::read(conn, old_post.community_id)
|
Community::read(conn, old_post.community_id)
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor},
|
fetcher::{
|
||||||
|
fetch::fetch_remote_object,
|
||||||
|
is_deleted,
|
||||||
|
person::get_or_fetch_and_upsert_person,
|
||||||
|
should_refetch_actor,
|
||||||
|
},
|
||||||
inbox::person_inbox::receive_announce,
|
inbox::person_inbox::receive_announce,
|
||||||
objects::FromApub,
|
objects::FromApub,
|
||||||
GroupExt,
|
GroupExt,
|
||||||
|
@ -11,8 +16,12 @@ use activitystreams::{
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use diesel::result::Error::NotFound;
|
use diesel::result::Error::NotFound;
|
||||||
use lemmy_api_structs::blocking;
|
use lemmy_api_structs::blocking;
|
||||||
use lemmy_db_queries::{source::community::Community_, ApubObject};
|
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
|
||||||
use lemmy_db_schema::source::community::Community;
|
use lemmy_db_schema::{
|
||||||
|
source::community::{Community, CommunityModerator, CommunityModeratorForm},
|
||||||
|
DbUrl,
|
||||||
|
};
|
||||||
|
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
@ -54,9 +63,9 @@ async fn fetch_remote_community(
|
||||||
apub_id: &Url,
|
apub_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
old_community: Option<Community>,
|
old_community: Option<Community>,
|
||||||
recursion_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<Community, LemmyError> {
|
) -> Result<Community, LemmyError> {
|
||||||
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
|
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await;
|
||||||
|
|
||||||
if let Some(c) = old_community.to_owned() {
|
if let Some(c) = old_community.to_owned() {
|
||||||
if is_deleted(&group) {
|
if is_deleted(&group) {
|
||||||
|
@ -71,24 +80,69 @@ async fn fetch_remote_community(
|
||||||
}
|
}
|
||||||
|
|
||||||
let group = group?;
|
let group = group?;
|
||||||
let community = Community::from_apub(
|
let community =
|
||||||
&group,
|
Community::from_apub(&group, context, apub_id.to_owned(), request_counter, false).await?;
|
||||||
context,
|
|
||||||
apub_id.to_owned(),
|
update_community_mods(&group, &community, context, request_counter).await?;
|
||||||
recursion_counter,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// only fetch outbox for new communities, otherwise this can create an infinite loop
|
// only fetch outbox for new communities, otherwise this can create an infinite loop
|
||||||
if old_community.is_none() {
|
if old_community.is_none() {
|
||||||
let outbox = group.inner.outbox()?.context(location_info!())?;
|
let outbox = group.inner.outbox()?.context(location_info!())?;
|
||||||
fetch_community_outbox(context, outbox, &community, recursion_counter).await?
|
fetch_community_outbox(context, outbox, &community, request_counter).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(community)
|
Ok(community)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn update_community_mods(
|
||||||
|
group: &GroupExt,
|
||||||
|
community: &Community,
|
||||||
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let new_moderators = fetch_community_mods(context, group, request_counter).await?;
|
||||||
|
let community_id = community.id;
|
||||||
|
let current_moderators = blocking(context.pool(), move |conn| {
|
||||||
|
CommunityModeratorView::for_community(&conn, community_id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
// Remove old mods from database which arent in the moderators collection anymore
|
||||||
|
for mod_user in ¤t_moderators {
|
||||||
|
if !new_moderators.contains(&&mod_user.moderator.actor_id.clone().into()) {
|
||||||
|
let community_moderator_form = CommunityModeratorForm {
|
||||||
|
community_id: mod_user.community.id,
|
||||||
dessalines marked this conversation as resolved
Outdated
dessalines
commented
I see this is fetched every time I see this is fetched every time `from_apub` is run on community. This doesn't seem right, it should be a smart upsert like the others.
nutomic
commented
Any idea how? I dont see how an upsert works here, because we cant simply overwrite a single database row, but need to insert or delete rows. Any idea how? I dont see how an upsert works here, because we cant simply overwrite a single database row, but need to insert or delete rows.
dessalines
commented
Check out It should be keyed just like the
Check out `get_or_fetch_upsert_community`, it looks like its already doing the initial moderators fetching.
It should be keyed just like the `should_refetch_actor(c.last_refreshed_at)`, after the initial community fetch. In fact I don't even know that that's necessary, since the only reason for fetching is to:
1. Get initial data
2. When we don't receive atomic updates (the last_refresh_at is only necessary bc community and user edits aren't pushed). New community moderators are pushed, so I don't know that this is necessary.
nutomic
commented
Ah that makes sense, I'm moving the update logic from objects/community.rs to fetcher/community.rs. But we still need to run this every time, because its possible that our instance doesnt follow the community, and doesnt receive any activities (we could possibly add a check for this later). And it is still necessary to fetch the community creator in FromApubToForm, until that field is removed. Ah that makes sense, I'm moving the update logic from objects/community.rs to fetcher/community.rs. But we still need to run this every time, because its possible that our instance doesnt follow the community, and doesnt receive any activities (we could possibly add a check for this later).
And it is still necessary to fetch the community creator in FromApubToForm, until that field is removed.
|
|||||||
|
person_id: mod_user.moderator.id,
|
||||||
|
};
|
||||||
|
blocking(context.pool(), move |conn| {
|
||||||
|
CommunityModerator::leave(conn, &community_moderator_form)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new mods to database which have been added to moderators collection
|
||||||
|
for mod_uri in new_moderators {
|
||||||
|
let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
|
||||||
|
let current_mod_uris: Vec<DbUrl> = current_moderators
|
||||||
|
.clone()
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.moderator.actor_id.clone())
|
||||||
|
.collect();
|
||||||
|
if !current_mod_uris.contains(&mod_user.actor_id) {
|
||||||
|
let community_moderator_form = CommunityModeratorForm {
|
||||||
|
community_id: community.id,
|
||||||
|
person_id: mod_user.id,
|
||||||
|
};
|
||||||
|
blocking(context.pool(), move |conn| {
|
||||||
|
CommunityModerator::join(conn, &community_moderator_form)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn fetch_community_outbox(
|
async fn fetch_community_outbox(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
outbox: &Url,
|
outbox: &Url,
|
||||||
|
|
|
@ -23,14 +23,11 @@ use activitystreams::{
|
||||||
};
|
};
|
||||||
use activitystreams_ext::Ext2;
|
use activitystreams_ext::Ext2;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use lemmy_api_structs::blocking;
|
use lemmy_db_queries::DbPool;
|
||||||
use lemmy_db_queries::{DbPool, Joinable};
|
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
naive_now,
|
naive_now,
|
||||||
source::community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
|
source::community::{Community, CommunityForm},
|
||||||
DbUrl,
|
|
||||||
};
|
};
|
||||||
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
|
|
||||||
use lemmy_utils::{
|
use lemmy_utils::{
|
||||||
location_info,
|
location_info,
|
||||||
utils::{check_slurs, check_slurs_opt, convert_datetime},
|
utils::{check_slurs, check_slurs_opt, convert_datetime},
|
||||||
|
@ -109,56 +106,14 @@ impl FromApub for Community {
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
mod_action_allowed: bool,
|
mod_action_allowed: bool,
|
||||||
) -> Result<Community, LemmyError> {
|
) -> Result<Community, LemmyError> {
|
||||||
let community: Community = get_object_from_apub(
|
get_object_from_apub(
|
||||||
group,
|
group,
|
||||||
context,
|
context,
|
||||||
expected_domain,
|
expected_domain,
|
||||||
request_counter,
|
request_counter,
|
||||||
mod_action_allowed,
|
mod_action_allowed,
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
|
||||||
let new_moderators = fetch_community_mods(context, group, request_counter).await?;
|
|
||||||
let community_id = community.id;
|
|
||||||
let current_moderators = blocking(context.pool(), move |conn| {
|
|
||||||
CommunityModeratorView::for_community(&conn, community_id)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
// Remove old mods from database which arent in the moderators collection anymore
|
|
||||||
for mod_user in ¤t_moderators {
|
|
||||||
if !new_moderators.contains(&&mod_user.moderator.actor_id.clone().into()) {
|
|
||||||
let community_moderator_form = CommunityModeratorForm {
|
|
||||||
community_id: mod_user.community.id,
|
|
||||||
person_id: mod_user.moderator.id,
|
|
||||||
};
|
|
||||||
blocking(context.pool(), move |conn| {
|
|
||||||
CommunityModerator::leave(conn, &community_moderator_form)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add new mods to database which have been added to moderators collection
|
|
||||||
for mod_uri in new_moderators {
|
|
||||||
let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
|
|
||||||
let current_mod_uris: Vec<DbUrl> = current_moderators
|
|
||||||
.clone()
|
|
||||||
.iter()
|
|
||||||
.map(|c| c.moderator.actor_id.clone())
|
|
||||||
.collect();
|
|
||||||
if !current_mod_uris.contains(&mod_user.actor_id) {
|
|
||||||
let community_moderator_form = CommunityModeratorForm {
|
|
||||||
community_id: community.id,
|
|
||||||
person_id: mod_user.id,
|
|
||||||
};
|
|
||||||
blocking(context.pool(), move |conn| {
|
|
||||||
CommunityModerator::join(conn, &community_moderator_form)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(community)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
I'm sure this is fine, but the lack of parenthesis scares me that it might be evaluated in a strange order.
How would the order change anything? But I can add parenthesis if it makes you happy.
if (((stickied != old_post.stickied) || locked) != old_post.locked)
is different from
if ((stickied != old_post.stickied) || (locked != old_post.locked))
Without parenthesis order of operations might do the first.
I dont think Rust would do that, but couldnt find any documentation on the behaviour. So I'm adding the parenthesis just in case.