From 9a9d518153be9abf05fd492b72ca5766d0e3053a Mon Sep 17 00:00:00 2001 From: Nutomic Date: Wed, 15 May 2024 05:03:43 +0200 Subject: [PATCH 1/2] Fix import blocked objects (#4712) * Allow importing partial backup (fixes #4672) * Fetch blocked objects if not known locally (fixes #4669) * extract helper fn * add comment * cleanup * remove test * fmt * remove .ok() --- crates/apub/src/api/user_settings_backup.rs | 238 ++++++++++---------- crates/apub/src/lib.rs | 4 +- 2 files changed, 117 insertions(+), 125 deletions(-) diff --git a/crates/apub/src/api/user_settings_backup.rs b/crates/apub/src/api/user_settings_backup.rs index 88f52a564..558551632 100644 --- a/crates/apub/src/api/user_settings_backup.rs +++ b/crates/apub/src/api/user_settings_backup.rs @@ -4,9 +4,10 @@ use crate::objects::{ person::ApubPerson, post::ApubPost, }; -use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; +use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object}; use actix_web::web::Json; use futures::{future::try_join_all, StreamExt}; +use itertools::Itertools; use lemmy_api_common::{context::LemmyContext, SuccessResponse}; use lemmy_db_schema::{ newtypes::DbUrl, @@ -30,8 +31,11 @@ use lemmy_utils::{ spawn_try_task, }; use serde::{Deserialize, Serialize}; +use std::future::Future; use tracing::info; +const PARALLELISM: usize = 10; + /// Backup of user data. This struct should never be changed so that the data can be used as a /// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow /// importing partial backups. @@ -167,141 +171,91 @@ pub async fn import_settings( } spawn_try_task(async move { - const PARALLELISM: usize = 10; let person_id = local_user_view.person.id; - // These tasks fetch objects from remote instances which might be down. - // TODO: Would be nice if we could send a list of failed items with api response, but then - // the request would likely timeout. - let mut failed_items = vec![]; - info!( - "Starting settings backup for {}", + "Starting settings import for {}", local_user_view.person.name ); - futures::stream::iter( - data - .followed_communities - .clone() - .into_iter() - // reset_request_count works like clone, and is necessary to avoid running into request limit - .map(|f| (f, context.reset_request_count())) - .map(|(followed, context)| async move { - // need to reset outgoing request count to avoid running into limit - let community = followed.dereference(&context).await?; - let form = CommunityFollowerForm { - person_id, - community_id: community.id, - pending: true, - }; - CommunityFollower::follow(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), + let failed_followed_communities = fetch_and_import( + data.followed_communities.clone(), + &context, + |(followed, context)| async move { + let community = followed.dereference(&context).await?; + let form = CommunityFollowerForm { + person_id, + community_id: community.id, + pending: true, + }; + CommunityFollower::follow(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import followed community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_posts - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let post = saved.dereference(&context).await?; - let form = PostSavedForm { - person_id, - post_id: post.id, - }; - PostSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved post community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_comments - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let comment = saved.dereference(&context).await?; - let form = CommentSavedForm { - person_id, - comment_id: comment.id, - }; - CommentSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved comment community: {e}"); - } - }); - - let failed_items: Vec<_> = failed_items.into_iter().flatten().collect(); - info!( - "Finished settings backup for {}, failed items: {:#?}", - local_user_view.person.name, failed_items - ); - - // These tasks don't connect to any remote instances but only insert directly in the database. - // That means the only error condition are db connection failures, so no extra error handling is - // needed. - try_join_all(data.blocked_communities.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let community = blocked.dereference_local(&context).await?; - let form = CommunityBlockForm { - person_id, - community_id: community.id, - }; - CommunityBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) .await?; - try_join_all(data.blocked_users.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let target = blocked.dereference_local(&context).await?; - let form = PersonBlockForm { - person_id, - target_id: target.id, - }; - PersonBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) + let failed_saved_posts = fetch_and_import( + data.saved_posts.clone(), + &context, + |(saved, context)| async move { + let post = saved.dereference(&context).await?; + let form = PostSavedForm { + person_id, + post_id: post.id, + }; + PostSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_saved_comments = fetch_and_import( + data.saved_comments.clone(), + &context, + |(saved, context)| async move { + let comment = saved.dereference(&context).await?; + let form = CommentSavedForm { + person_id, + comment_id: comment.id, + }; + CommentSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_community_blocks = fetch_and_import( + data.blocked_communities.clone(), + &context, + |(blocked, context)| async move { + let community = blocked.dereference(&context).await?; + let form = CommunityBlockForm { + person_id, + community_id: community.id, + }; + CommunityBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_user_blocks = fetch_and_import( + data.blocked_users.clone(), + &context, + |(blocked, context)| async move { + let context = context.reset_request_count(); + let target = blocked.dereference(&context).await?; + let form = PersonBlockForm { + person_id, + target_id: target.id, + }; + PersonBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) .await?; try_join_all(data.blocked_instances.iter().map(|domain| async { - // dont fetch unknown blocked objects from home server let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?; let form = InstanceBlockForm { person_id, @@ -312,12 +266,48 @@ pub async fn import_settings( })) .await?; + info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}", + local_user_view.person.name); + Ok(()) }); Ok(Json(Default::default())) } +async fn fetch_and_import( + objects: Vec>, + context: &Data, + import_fn: impl FnMut((ObjectId, Data)) -> Fut, +) -> LemmyResult +where + Kind: Object + Send + 'static, + for<'de2> ::Kind: Deserialize<'de2>, + Fut: Future>, +{ + let mut failed_items = vec![]; + futures::stream::iter( + objects + .clone() + .into_iter() + // need to reset outgoing request count to avoid running into limit + .map(|s| (s, context.reset_request_count())) + .map(import_fn), + ) + .buffer_unordered(PARALLELISM) + .collect::>() + .await + .into_iter() + .enumerate() + .for_each(|(i, r): (usize, LemmyResult<()>)| { + if r.is_err() { + if let Some(object) = objects.get(i) { + failed_items.push(object.inner().clone()); + } + } + }); + Ok(failed_items.into_iter().join(",")) +} #[cfg(test)] #[allow(clippy::indexing_slicing)] mod tests { diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index a5643b95c..c8960b008 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -29,7 +29,9 @@ pub(crate) mod mentions; pub mod objects; pub mod protocol; -pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50; +/// Maximum number of outgoing HTTP requests to fetch a single object. Needs to be high enough +/// to fetch a new community with posts, moderators and featured posts. +pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 100; /// Only include a basic context to save space and bandwidth. The main context is hosted statically /// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could From 93c9a5f2b143dc6ced672b820dc65ed6e705b896 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Wed, 15 May 2024 13:36:00 +0200 Subject: [PATCH 2/2] Dont federate post locking via Update activity (#4717) * Dont federate post locking via Update activity * cleanup * add missing mod log entries * update assets --- .../create_or_update/create_page.json | 1 - .../create_or_update/update_page.json | 1 - .../collections/group_featured_posts.json | 2 - .../lemmy/collections/group_outbox.json | 2 - crates/apub/assets/lemmy/objects/page.json | 1 - .../src/activities/community/lock_page.rs | 24 +++- .../src/activities/create_or_update/post.rs | 30 +---- crates/apub/src/objects/post.rs | 106 ++++++------------ crates/apub/src/protocol/objects/page.rs | 23 ---- 9 files changed, 62 insertions(+), 128 deletions(-) diff --git a/crates/apub/assets/lemmy/activities/create_or_update/create_page.json b/crates/apub/assets/lemmy/activities/create_or_update/create_page.json index 50d2536fe..114c5b557 100644 --- a/crates/apub/assets/lemmy/activities/create_or_update/create_page.json +++ b/crates/apub/assets/lemmy/activities/create_or_update/create_page.json @@ -23,7 +23,6 @@ "href": "https://lemmy.ml/pictrs/image/xl8W7FZfk9.jpg" } ], - "commentsEnabled": true, "sensitive": false, "language": { "identifier": "ko", diff --git a/crates/apub/assets/lemmy/activities/create_or_update/update_page.json b/crates/apub/assets/lemmy/activities/create_or_update/update_page.json index 888b866b8..e9569e6f7 100644 --- a/crates/apub/assets/lemmy/activities/create_or_update/update_page.json +++ b/crates/apub/assets/lemmy/activities/create_or_update/update_page.json @@ -23,7 +23,6 @@ "href": "https://lemmy.ml/pictrs/image/xl8W7FZfk9.jpg" } ], - "commentsEnabled": true, "sensitive": false, "published": "2021-10-29T15:10:51.557399Z", "updated": "2021-10-29T15:11:35.976374Z" diff --git a/crates/apub/assets/lemmy/collections/group_featured_posts.json b/crates/apub/assets/lemmy/collections/group_featured_posts.json index 59f1afb9c..70b4d7d3a 100644 --- a/crates/apub/assets/lemmy/collections/group_featured_posts.json +++ b/crates/apub/assets/lemmy/collections/group_featured_posts.json @@ -15,7 +15,6 @@ "cc": [], "mediaType": "text/html", "attachment": [], - "commentsEnabled": true, "sensitive": false, "published": "2023-02-06T06:42:41.939437Z", "language": { @@ -36,7 +35,6 @@ "cc": [], "mediaType": "text/html", "attachment": [], - "commentsEnabled": true, "sensitive": false, "published": "2023-02-06T06:42:37.119567Z", "language": { diff --git a/crates/apub/assets/lemmy/collections/group_outbox.json b/crates/apub/assets/lemmy/collections/group_outbox.json index c7279a799..231399e13 100644 --- a/crates/apub/assets/lemmy/collections/group_outbox.json +++ b/crates/apub/assets/lemmy/collections/group_outbox.json @@ -22,7 +22,6 @@ ], "name": "another outbox test", "mediaType": "text/html", - "commentsEnabled": true, "sensitive": false, "stickied": false, "published": "2021-11-18T17:19:45.895163Z" @@ -51,7 +50,6 @@ ], "name": "outbox test", "mediaType": "text/html", - "commentsEnabled": true, "sensitive": false, "stickied": false, "published": "2021-11-18T17:19:05.763109Z" diff --git a/crates/apub/assets/lemmy/objects/page.json b/crates/apub/assets/lemmy/objects/page.json index 6b536dd90..20af5dfd2 100644 --- a/crates/apub/assets/lemmy/objects/page.json +++ b/crates/apub/assets/lemmy/objects/page.json @@ -25,7 +25,6 @@ "url": "https://enterprise.lemmy.ml/pictrs/image/eOtYb9iEiB.png" }, "sensitive": false, - "commentsEnabled": true, "language": { "identifier": "fr", "name": "Français" diff --git a/crates/apub/src/activities/community/lock_page.rs b/crates/apub/src/activities/community/lock_page.rs index bafb42a4a..322cd88c2 100644 --- a/crates/apub/src/activities/community/lock_page.rs +++ b/crates/apub/src/activities/community/lock_page.rs @@ -26,6 +26,7 @@ use lemmy_db_schema::{ source::{ activity::ActivitySendTargets, community::Community, + moderator::{ModLockPost, ModLockPostForm}, person::Person, post::{Post, PostUpdateForm}, }, @@ -60,12 +61,22 @@ impl ActivityHandler for LockPage { } async fn receive(self, context: &Data) -> Result<(), Self::Error> { + insert_received_activity(&self.id, context).await?; + let locked = Some(true); let form = PostUpdateForm { - locked: Some(true), + locked, ..Default::default() }; let post = self.object.dereference(context).await?; Post::update(&mut context.pool(), post.id, &form).await?; + + let form = ModLockPostForm { + mod_person_id: self.actor.dereference(context).await?.id, + post_id: post.id, + locked, + }; + ModLockPost::create(&mut context.pool(), &form).await?; + Ok(()) } } @@ -94,12 +105,21 @@ impl ActivityHandler for UndoLockPage { async fn receive(self, context: &Data) -> Result<(), Self::Error> { insert_received_activity(&self.id, context).await?; + let locked = Some(false); let form = PostUpdateForm { - locked: Some(false), + locked, ..Default::default() }; let post = self.object.object.dereference(context).await?; Post::update(&mut context.pool(), post.id, &form).await?; + + let form = ModLockPostForm { + mod_person_id: self.actor.dereference(context).await?.id, + post_id: post.id, + locked, + }; + ModLockPost::create(&mut context.pool(), &form).await?; + Ok(()) } } diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index 53900c799..e8bfc36e6 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -4,7 +4,6 @@ use crate::{ community::send_activity_in_community, generate_activity_id, verify_is_public, - verify_mod_action, verify_person_in_community, }, activity_lists::AnnouncableActivities, @@ -78,14 +77,13 @@ impl CreateOrUpdatePage { let create_or_update = CreateOrUpdatePage::new(post.into(), &person, &community, kind, &context).await?; - let is_mod_action = create_or_update.object.is_mod_action(&context).await?; let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update); send_activity_in_community( activity, &person, &community, ActivitySendTargets::empty(), - is_mod_action, + false, &context, ) .await?; @@ -112,30 +110,8 @@ impl ActivityHandler for CreateOrUpdatePage { let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; check_community_deleted_or_removed(&community)?; - - match self.kind { - CreateOrUpdateType::Create => { - verify_domains_match(self.actor.inner(), self.object.id.inner())?; - verify_urls_match(self.actor.inner(), self.object.creator()?.inner())?; - // Check that the post isnt locked, as that isnt possible for newly created posts. - // However, when fetching a remote post we generate a new create activity with the current - // locked value, so this check may fail. So only check if its a local community, - // because then we will definitely receive all create and update activities separately. - let is_locked = self.object.comments_enabled == Some(false); - if community.local && is_locked { - Err(LemmyErrorType::NewPostCannotBeLocked)? - } - } - CreateOrUpdateType::Update => { - let is_mod_action = self.object.is_mod_action(context).await?; - if is_mod_action { - verify_mod_action(&self.actor, &community, context).await?; - } else { - verify_domains_match(self.actor.inner(), self.object.id.inner())?; - verify_urls_match(self.actor.inner(), self.object.creator()?.inner())?; - } - } - } + verify_domains_match(self.actor.inner(), self.object.id.inner())?; + verify_urls_match(self.actor.inner(), self.object.creator()?.inner())?; ApubPost::verify(&self.object, self.actor.inner(), context).await?; Ok(()) } diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index ff11c985c..929d598cd 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -36,7 +36,6 @@ use lemmy_db_schema::{ source::{ community::Community, local_site::LocalSite, - moderator::{ModLockPost, ModLockPostForm}, person::Person, post::{Post, PostInsertForm, PostUpdateForm}, }, @@ -147,7 +146,6 @@ impl Object for ApubPost { source: self.body.clone().map(Source::new), attachment, image: self.thumbnail_url.clone().map(ImageObject::new), - comments_enabled: Some(!self.locked), sensitive: Some(self.nsfw), language, published: Some(self.published), @@ -165,12 +163,8 @@ impl Object for ApubPost { expected_domain: &Url, context: &Data, ) -> LemmyResult<()> { - // We can't verify the domain in case of mod action, because the mod may be on a different - // instance from the post author. - if !page.is_mod_action(context).await? { - verify_domains_match(page.id.inner(), expected_domain)?; - verify_is_remote_object(&page.id, context)?; - }; + verify_domains_match(page.id.inner(), expected_domain)?; + verify_is_remote_object(&page.id, context)?; let community = page.community(context).await?; check_apub_id_valid_with_strictness(page.id.inner(), community.local, context).await?; @@ -218,62 +212,46 @@ impl Object for ApubPost { name = name.chars().take(MAX_TITLE_LENGTH).collect(); } - // read existing, local post if any (for generating mod log) - let old_post = page.id.dereference_local(context).await; - let first_attachment = page.attachment.first(); let local_site = LocalSite::read(&mut context.pool()).await.ok(); - let form = if !page.is_mod_action(context).await? { - let url = if let Some(attachment) = first_attachment.cloned() { - Some(attachment.url()) - } else if page.kind == PageType::Video { - // we cant display videos directly, so insert a link to external video page - Some(page.id.inner().clone()) - } else { - None - }; - check_url_scheme(&url)?; - - let alt_text = first_attachment.cloned().and_then(Attachment::alt_text); - - let url = proxy_image_link_opt_apub(url, context).await?; - - let slur_regex = &local_site_opt_to_slur_regex(&local_site); - let url_blocklist = get_url_blocklist(context).await?; - - let body = read_from_string_or_source_opt(&page.content, &page.media_type, &page.source); - let body = process_markdown_opt(&body, slur_regex, &url_blocklist, context).await?; - let language_id = - LanguageTag::to_language_id_single(page.language, &mut context.pool()).await?; - - PostInsertForm::builder() - .name(name) - .url(url.map(Into::into)) - .body(body) - .alt_text(alt_text) - .creator_id(creator.id) - .community_id(community.id) - .locked(page.comments_enabled.map(|e| !e)) - .published(page.published.map(Into::into)) - .updated(page.updated.map(Into::into)) - .deleted(Some(false)) - .nsfw(page.sensitive) - .ap_id(Some(page.id.clone().into())) - .local(Some(false)) - .language_id(language_id) - .build() + let url = if let Some(attachment) = first_attachment.cloned() { + Some(attachment.url()) + } else if page.kind == PageType::Video { + // we cant display videos directly, so insert a link to external video page + Some(page.id.inner().clone()) } else { - // if is mod action, only update locked/stickied fields, nothing else - PostInsertForm::builder() - .name(name) - .creator_id(creator.id) - .community_id(community.id) - .ap_id(Some(page.id.clone().into())) - .locked(page.comments_enabled.map(|e| !e)) - .updated(page.updated.map(Into::into)) - .build() + None }; + check_url_scheme(&url)?; + + let alt_text = first_attachment.cloned().and_then(Attachment::alt_text); + + let url = proxy_image_link_opt_apub(url, context).await?; + + let slur_regex = &local_site_opt_to_slur_regex(&local_site); + let url_blocklist = get_url_blocklist(context).await?; + + let body = read_from_string_or_source_opt(&page.content, &page.media_type, &page.source); + let body = process_markdown_opt(&body, slur_regex, &url_blocklist, context).await?; + let language_id = + LanguageTag::to_language_id_single(page.language, &mut context.pool()).await?; + + let form = PostInsertForm::builder() + .name(name) + .url(url.map(Into::into)) + .body(body) + .alt_text(alt_text) + .creator_id(creator.id) + .community_id(community.id) + .published(page.published.map(Into::into)) + .updated(page.updated.map(Into::into)) + .deleted(Some(false)) + .nsfw(page.sensitive) + .ap_id(Some(page.id.clone().into())) + .local(Some(false)) + .language_id(language_id) + .build(); let timestamp = page.updated.or(page.published).unwrap_or_else(naive_now); let post = Post::insert_apub(&mut context.pool(), timestamp, &form).await?; @@ -287,16 +265,6 @@ impl Object for ApubPost { context.reset_request_count(), ); - // write mod log entry for lock - if Page::is_locked_changed(&old_post, &page.comments_enabled) { - let form = ModLockPostForm { - mod_person_id: creator.id, - post_id: post.id, - locked: Some(post.locked), - }; - ModLockPost::create(&mut context.pool(), &form).await?; - } - Ok(post.into()) } } diff --git a/crates/apub/src/protocol/objects/page.rs b/crates/apub/src/protocol/objects/page.rs index bbb46bfaf..6075b14a1 100644 --- a/crates/apub/src/protocol/objects/page.rs +++ b/crates/apub/src/protocol/objects/page.rs @@ -60,7 +60,6 @@ pub struct Page { #[serde(default)] pub(crate) attachment: Vec, pub(crate) image: Option, - pub(crate) comments_enabled: Option, pub(crate) sensitive: Option, pub(crate) published: Option>, pub(crate) updated: Option>, @@ -156,28 +155,6 @@ pub enum HashtagType { } impl Page { - /// Only mods can change the post's locked status. So if it is changed from the default value, - /// it is a mod action and needs to be verified as such. - /// - /// Locked needs to be false on a newly created post (verified in [[CreatePost]]. - pub(crate) async fn is_mod_action(&self, context: &Data) -> LemmyResult { - let old_post = self.id.clone().dereference_local(context).await; - Ok(Page::is_locked_changed(&old_post, &self.comments_enabled)) - } - - pub(crate) fn is_locked_changed( - old_post: &Result, - new_comments_enabled: &Option, - ) -> bool { - if let Some(new_comments_enabled) = new_comments_enabled { - if let Ok(old_post) = old_post { - return new_comments_enabled != &!old_post.locked; - } - } - - false - } - pub(crate) fn creator(&self) -> LemmyResult> { match &self.attributed_to { AttributedTo::Lemmy(l) => Ok(l.clone()),