From 9a9d518153be9abf05fd492b72ca5766d0e3053a Mon Sep 17 00:00:00 2001 From: Nutomic Date: Wed, 15 May 2024 05:03:43 +0200 Subject: [PATCH] Fix import blocked objects (#4712) * Allow importing partial backup (fixes #4672) * Fetch blocked objects if not known locally (fixes #4669) * extract helper fn * add comment * cleanup * remove test * fmt * remove .ok() --- crates/apub/src/api/user_settings_backup.rs | 238 ++++++++++---------- crates/apub/src/lib.rs | 4 +- 2 files changed, 117 insertions(+), 125 deletions(-) diff --git a/crates/apub/src/api/user_settings_backup.rs b/crates/apub/src/api/user_settings_backup.rs index 88f52a564a..5585516327 100644 --- a/crates/apub/src/api/user_settings_backup.rs +++ b/crates/apub/src/api/user_settings_backup.rs @@ -4,9 +4,10 @@ use crate::objects::{ person::ApubPerson, post::ApubPost, }; -use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; +use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object}; use actix_web::web::Json; use futures::{future::try_join_all, StreamExt}; +use itertools::Itertools; use lemmy_api_common::{context::LemmyContext, SuccessResponse}; use lemmy_db_schema::{ newtypes::DbUrl, @@ -30,8 +31,11 @@ use lemmy_utils::{ spawn_try_task, }; use serde::{Deserialize, Serialize}; +use std::future::Future; use tracing::info; +const PARALLELISM: usize = 10; + /// Backup of user data. This struct should never be changed so that the data can be used as a /// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow /// importing partial backups. @@ -167,141 +171,91 @@ pub async fn import_settings( } spawn_try_task(async move { - const PARALLELISM: usize = 10; let person_id = local_user_view.person.id; - // These tasks fetch objects from remote instances which might be down. - // TODO: Would be nice if we could send a list of failed items with api response, but then - // the request would likely timeout. - let mut failed_items = vec![]; - info!( - "Starting settings backup for {}", + "Starting settings import for {}", local_user_view.person.name ); - futures::stream::iter( - data - .followed_communities - .clone() - .into_iter() - // reset_request_count works like clone, and is necessary to avoid running into request limit - .map(|f| (f, context.reset_request_count())) - .map(|(followed, context)| async move { - // need to reset outgoing request count to avoid running into limit - let community = followed.dereference(&context).await?; - let form = CommunityFollowerForm { - person_id, - community_id: community.id, - pending: true, - }; - CommunityFollower::follow(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), + let failed_followed_communities = fetch_and_import( + data.followed_communities.clone(), + &context, + |(followed, context)| async move { + let community = followed.dereference(&context).await?; + let form = CommunityFollowerForm { + person_id, + community_id: community.id, + pending: true, + }; + CommunityFollower::follow(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import followed community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_posts - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let post = saved.dereference(&context).await?; - let form = PostSavedForm { - person_id, - post_id: post.id, - }; - PostSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved post community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_comments - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let comment = saved.dereference(&context).await?; - let form = CommentSavedForm { - person_id, - comment_id: comment.id, - }; - CommentSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved comment community: {e}"); - } - }); - - let failed_items: Vec<_> = failed_items.into_iter().flatten().collect(); - info!( - "Finished settings backup for {}, failed items: {:#?}", - local_user_view.person.name, failed_items - ); - - // These tasks don't connect to any remote instances but only insert directly in the database. - // That means the only error condition are db connection failures, so no extra error handling is - // needed. - try_join_all(data.blocked_communities.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let community = blocked.dereference_local(&context).await?; - let form = CommunityBlockForm { - person_id, - community_id: community.id, - }; - CommunityBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) .await?; - try_join_all(data.blocked_users.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let target = blocked.dereference_local(&context).await?; - let form = PersonBlockForm { - person_id, - target_id: target.id, - }; - PersonBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) + let failed_saved_posts = fetch_and_import( + data.saved_posts.clone(), + &context, + |(saved, context)| async move { + let post = saved.dereference(&context).await?; + let form = PostSavedForm { + person_id, + post_id: post.id, + }; + PostSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_saved_comments = fetch_and_import( + data.saved_comments.clone(), + &context, + |(saved, context)| async move { + let comment = saved.dereference(&context).await?; + let form = CommentSavedForm { + person_id, + comment_id: comment.id, + }; + CommentSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_community_blocks = fetch_and_import( + data.blocked_communities.clone(), + &context, + |(blocked, context)| async move { + let community = blocked.dereference(&context).await?; + let form = CommunityBlockForm { + person_id, + community_id: community.id, + }; + CommunityBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_user_blocks = fetch_and_import( + data.blocked_users.clone(), + &context, + |(blocked, context)| async move { + let context = context.reset_request_count(); + let target = blocked.dereference(&context).await?; + let form = PersonBlockForm { + person_id, + target_id: target.id, + }; + PersonBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) .await?; try_join_all(data.blocked_instances.iter().map(|domain| async { - // dont fetch unknown blocked objects from home server let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?; let form = InstanceBlockForm { person_id, @@ -312,12 +266,48 @@ pub async fn import_settings( })) .await?; + info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}", + local_user_view.person.name); + Ok(()) }); Ok(Json(Default::default())) } +async fn fetch_and_import( + objects: Vec>, + context: &Data, + import_fn: impl FnMut((ObjectId, Data)) -> Fut, +) -> LemmyResult +where + Kind: Object + Send + 'static, + for<'de2> ::Kind: Deserialize<'de2>, + Fut: Future>, +{ + let mut failed_items = vec![]; + futures::stream::iter( + objects + .clone() + .into_iter() + // need to reset outgoing request count to avoid running into limit + .map(|s| (s, context.reset_request_count())) + .map(import_fn), + ) + .buffer_unordered(PARALLELISM) + .collect::>() + .await + .into_iter() + .enumerate() + .for_each(|(i, r): (usize, LemmyResult<()>)| { + if r.is_err() { + if let Some(object) = objects.get(i) { + failed_items.push(object.inner().clone()); + } + } + }); + Ok(failed_items.into_iter().join(",")) +} #[cfg(test)] #[allow(clippy::indexing_slicing)] mod tests { diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index a5643b95c3..c8960b0083 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -29,7 +29,9 @@ pub(crate) mod mentions; pub mod objects; pub mod protocol; -pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50; +/// Maximum number of outgoing HTTP requests to fetch a single object. Needs to be high enough +/// to fetch a new community with posts, moderators and featured posts. +pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 100; /// Only include a basic context to save space and bandwidth. The main context is hosted statically /// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could