From 3d5647b16f529f235e7e2549861491fbe1c9a0a6 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 22 Oct 2020 20:27:32 +0200 Subject: [PATCH] Limit amount of HTTP requests to handle activities (fixes #1221) --- lemmy_apub/src/activities/receive/comment.rs | 46 ++-- .../src/activities/receive/comment_undo.rs | 28 ++- .../src/activities/receive/community.rs | 16 +- lemmy_apub/src/activities/receive/mod.rs | 8 +- lemmy_apub/src/activities/receive/post.rs | 39 ++-- .../src/activities/receive/post_undo.rs | 28 ++- lemmy_apub/src/activities/send/comment.rs | 2 +- lemmy_apub/src/activities/send/community.rs | 2 +- lemmy_apub/src/fetcher.rs | 205 +++++++++++------- lemmy_apub/src/inbox/community_inbox.rs | 3 +- lemmy_apub/src/inbox/shared_inbox.rs | 104 +++++---- lemmy_apub/src/inbox/user_inbox.rs | 25 ++- lemmy_apub/src/lib.rs | 1 + lemmy_apub/src/objects/comment.rs | 7 +- lemmy_apub/src/objects/community.rs | 3 +- lemmy_apub/src/objects/post.rs | 6 +- lemmy_apub/src/objects/private_message.rs | 6 +- lemmy_apub/src/objects/user.rs | 1 + 18 files changed, 332 insertions(+), 198 deletions(-) diff --git a/lemmy_apub/src/activities/receive/comment.rs b/lemmy_apub/src/activities/receive/comment.rs index 971248c4..e7efe814 100644 --- a/lemmy_apub/src/activities/receive/comment.rs +++ b/lemmy_apub/src/activities/receive/comment.rs @@ -25,12 +25,14 @@ use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation}; pub(crate) async fn receive_create_comment( create: Create, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(&create, context).await?; + let user = get_actor_as_user(&create, context, request_counter).await?; let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let comment = CommentForm::from_apub(¬e, context, Some(user.actor_id()?)).await?; + let comment = + CommentForm::from_apub(¬e, context, Some(user.actor_id()?), request_counter).await?; let inserted_comment = blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??; @@ -71,23 +73,26 @@ pub(crate) async fn receive_create_comment( websocket_id: None, }); - announce_if_community_is_local(create, &user, context).await?; + announce_if_community_is_local(create, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_update_comment( update: Update, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let user = get_actor_as_user(&update, context).await?; + let user = get_actor_as_user(&update, context, request_counter).await?; - let comment = CommentForm::from_apub(¬e, context, Some(user.actor_id()?)).await?; + let comment = + CommentForm::from_apub(¬e, context, Some(user.actor_id()?), request_counter).await?; - let original_comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) - .await? - .id; + let original_comment_id = + get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter) + .await? + .id; let updated_comment = blocking(context.pool(), move |conn| { Comment::update(conn, original_comment_id, &comment) @@ -126,21 +131,22 @@ pub(crate) async fn receive_update_comment( websocket_id: None, }); - announce_if_community_is_local(update, &user, context).await?; + announce_if_community_is_local(update, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_like_comment( like: Like, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let user = get_actor_as_user(&like, context).await?; + let user = get_actor_as_user(&like, context, request_counter).await?; - let comment = CommentForm::from_apub(¬e, context, None).await?; + let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?; - let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) + let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter) .await? .id; @@ -177,13 +183,14 @@ pub(crate) async fn receive_like_comment( websocket_id: None, }); - announce_if_community_is_local(like, &user, context).await?; + announce_if_community_is_local(like, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_dislike_comment( dislike: Dislike, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { let note = Note::from_any_base( dislike @@ -193,11 +200,11 @@ pub(crate) async fn receive_dislike_comment( .context(location_info!())?, )? .context(location_info!())?; - let user = get_actor_as_user(&dislike, context).await?; + let user = get_actor_as_user(&dislike, context, request_counter).await?; - let comment = CommentForm::from_apub(¬e, context, None).await?; + let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?; - let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) + let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter) .await? .id; @@ -234,7 +241,7 @@ pub(crate) async fn receive_dislike_comment( websocket_id: None, }); - announce_if_community_is_local(dislike, &user, context).await?; + announce_if_community_is_local(dislike, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -242,6 +249,7 @@ pub(crate) async fn receive_delete_comment( context: &LemmyContext, delete: Delete, comment: Comment, + request_counter: &mut i32, ) -> Result { let deleted_comment = blocking(context.pool(), move |conn| { Comment::update_deleted(conn, comment.id, true) @@ -268,8 +276,8 @@ pub(crate) async fn receive_delete_comment( websocket_id: None, }); - let user = get_actor_as_user(&delete, context).await?; - announce_if_community_is_local(delete, &user, context).await?; + let user = get_actor_as_user(&delete, context, request_counter).await?; + announce_if_community_is_local(delete, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/lemmy_apub/src/activities/receive/comment_undo.rs b/lemmy_apub/src/activities/receive/comment_undo.rs index 4c046b6f..4d6ca29b 100644 --- a/lemmy_apub/src/activities/receive/comment_undo.rs +++ b/lemmy_apub/src/activities/receive/comment_undo.rs @@ -19,14 +19,15 @@ pub(crate) async fn receive_undo_like_comment( undo: Undo, like: &Like, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(like, context).await?; + let user = get_actor_as_user(like, context, request_counter).await?; let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let comment = CommentForm::from_apub(¬e, context, None).await?; + let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?; - let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) + let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter) .await? .id; @@ -56,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment( websocket_id: None, }); - announce_if_community_is_local(undo, &user, context).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -64,8 +65,9 @@ pub(crate) async fn receive_undo_dislike_comment( undo: Undo, dislike: &Dislike, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(dislike, context).await?; + let user = get_actor_as_user(dislike, context, request_counter).await?; let note = Note::from_any_base( dislike .object() @@ -75,9 +77,9 @@ pub(crate) async fn receive_undo_dislike_comment( )? .context(location_info!())?; - let comment = CommentForm::from_apub(¬e, context, None).await?; + let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?; - let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) + let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter) .await? .id; @@ -107,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment( websocket_id: None, }); - announce_if_community_is_local(undo, &user, context).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -115,6 +117,7 @@ pub(crate) async fn receive_undo_delete_comment( context: &LemmyContext, undo: Undo, comment: Comment, + request_counter: &mut i32, ) -> Result { let deleted_comment = blocking(context.pool(), move |conn| { Comment::update_deleted(conn, comment.id, false) @@ -142,8 +145,8 @@ pub(crate) async fn receive_undo_delete_comment( websocket_id: None, }); - let user = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &user, context).await?; + let user = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -151,6 +154,7 @@ pub(crate) async fn receive_undo_remove_comment( context: &LemmyContext, undo: Undo, comment: Comment, + request_counter: &mut i32, ) -> Result { let removed_comment = blocking(context.pool(), move |conn| { Comment::update_removed(conn, comment.id, false) @@ -178,7 +182,7 @@ pub(crate) async fn receive_undo_remove_comment( websocket_id: None, }); - let mod_ = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &mod_, context).await?; + let mod_ = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &mod_, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/lemmy_apub/src/activities/receive/community.rs b/lemmy_apub/src/activities/receive/community.rs index 7ec9c2e7..a79a85ee 100644 --- a/lemmy_apub/src/activities/receive/community.rs +++ b/lemmy_apub/src/activities/receive/community.rs @@ -10,6 +10,7 @@ pub(crate) async fn receive_delete_community( context: &LemmyContext, delete: Delete, community: Community, + request_counter: &mut i32, ) -> Result { let deleted_community = blocking(context.pool(), move |conn| { Community::update_deleted(conn, community.id, true) @@ -32,8 +33,8 @@ pub(crate) async fn receive_delete_community( websocket_id: None, }); - let user = get_actor_as_user(&delete, context).await?; - announce_if_community_is_local(delete, &user, context).await?; + let user = get_actor_as_user(&delete, context, request_counter).await?; + announce_if_community_is_local(delete, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -63,6 +64,7 @@ pub(crate) async fn receive_remove_community( websocket_id: None, }); + // TODO: this should probably also call announce_if_community_is_local() Ok(HttpResponse::Ok().finish()) } @@ -70,6 +72,7 @@ pub(crate) async fn receive_undo_delete_community( context: &LemmyContext, undo: Undo, community: Community, + request_counter: &mut i32, ) -> Result { let deleted_community = blocking(context.pool(), move |conn| { Community::update_deleted(conn, community.id, false) @@ -92,8 +95,8 @@ pub(crate) async fn receive_undo_delete_community( websocket_id: None, }); - let user = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &user, context).await?; + let user = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -101,6 +104,7 @@ pub(crate) async fn receive_undo_remove_community( context: &LemmyContext, undo: Undo, community: Community, + request_counter: &mut i32, ) -> Result { let removed_community = blocking(context.pool(), move |conn| { Community::update_removed(conn, community.id, false) @@ -124,7 +128,7 @@ pub(crate) async fn receive_undo_remove_community( websocket_id: None, }); - let mod_ = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &mod_, context).await?; + let mod_ = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &mod_, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/lemmy_apub/src/activities/receive/mod.rs b/lemmy_apub/src/activities/receive/mod.rs index 0003cab9..59c55519 100644 --- a/lemmy_apub/src/activities/receive/mod.rs +++ b/lemmy_apub/src/activities/receive/mod.rs @@ -41,6 +41,7 @@ async fn announce_if_community_is_local( activity: T, user: &User_, context: &LemmyContext, + request_counter: &mut i32, ) -> Result<(), LemmyError> where T: AsObject, @@ -55,7 +56,9 @@ where .context(location_info!())? .as_xsd_any_uri() .context(location_info!())?; - let community = get_or_fetch_and_upsert_community(&community_uri, context).await?; + // TODO: we could just read from the local db here (and ignore if the community is not found) + let community = + get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?; if community.local { community @@ -69,13 +72,14 @@ where pub(crate) async fn get_actor_as_user( activity: &T, context: &LemmyContext, + request_counter: &mut i32, ) -> Result where T: AsBase + ActorAndObjectRef, { let actor = activity.actor()?; let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?; - get_or_fetch_and_upsert_user(&user_uri, context).await + get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await } pub(crate) enum FindResults { diff --git a/lemmy_apub/src/activities/receive/post.rs b/lemmy_apub/src/activities/receive/post.rs index b82b7922..60fa8db1 100644 --- a/lemmy_apub/src/activities/receive/post.rs +++ b/lemmy_apub/src/activities/receive/post.rs @@ -24,12 +24,13 @@ use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation}; pub(crate) async fn receive_create_post( create: Create, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(&create, context).await?; + let user = get_actor_as_user(&create, context, request_counter).await?; let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?; + let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?; // Using an upsert, since likes (which fetch the post), sometimes come in before the create // resulting in double posts. @@ -50,21 +51,22 @@ pub(crate) async fn receive_create_post( websocket_id: None, }); - announce_if_community_is_local(create, &user, context).await?; + announce_if_community_is_local(create, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_update_post( update: Update, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(&update, context).await?; + let user = get_actor_as_user(&update, context, request_counter).await?; let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?; + let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?; - let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) + let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter) .await? .id; @@ -87,21 +89,22 @@ pub(crate) async fn receive_update_post( websocket_id: None, }); - announce_if_community_is_local(update, &user, context).await?; + announce_if_community_is_local(update, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_like_post( like: Like, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(&like, context).await?; + let user = get_actor_as_user(&like, context, request_counter).await?; let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, None).await?; + let post = PostForm::from_apub(&page, context, None, request_counter).await?; - let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) + let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter) .await? .id; @@ -131,15 +134,16 @@ pub(crate) async fn receive_like_post( websocket_id: None, }); - announce_if_community_is_local(like, &user, context).await?; + announce_if_community_is_local(like, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } pub(crate) async fn receive_dislike_post( dislike: Dislike, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(&dislike, context).await?; + let user = get_actor_as_user(&dislike, context, request_counter).await?; let page = PageExt::from_any_base( dislike .object() @@ -149,9 +153,9 @@ pub(crate) async fn receive_dislike_post( )? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, None).await?; + let post = PostForm::from_apub(&page, context, None, request_counter).await?; - let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) + let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter) .await? .id; @@ -181,7 +185,7 @@ pub(crate) async fn receive_dislike_post( websocket_id: None, }); - announce_if_community_is_local(dislike, &user, context).await?; + announce_if_community_is_local(dislike, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -189,6 +193,7 @@ pub(crate) async fn receive_delete_post( context: &LemmyContext, delete: Delete, post: Post, + request_counter: &mut i32, ) -> Result { let deleted_post = blocking(context.pool(), move |conn| { Post::update_deleted(conn, post.id, true) @@ -209,8 +214,8 @@ pub(crate) async fn receive_delete_post( websocket_id: None, }); - let user = get_actor_as_user(&delete, context).await?; - announce_if_community_is_local(delete, &user, context).await?; + let user = get_actor_as_user(&delete, context, request_counter).await?; + announce_if_community_is_local(delete, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/lemmy_apub/src/activities/receive/post_undo.rs b/lemmy_apub/src/activities/receive/post_undo.rs index e1638bc4..dea1a621 100644 --- a/lemmy_apub/src/activities/receive/post_undo.rs +++ b/lemmy_apub/src/activities/receive/post_undo.rs @@ -20,14 +20,15 @@ pub(crate) async fn receive_undo_like_post( undo: Undo, like: &Like, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(like, context).await?; + let user = get_actor_as_user(like, context, request_counter).await?; let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, None).await?; + let post = PostForm::from_apub(&page, context, None, request_counter).await?; - let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) + let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter) .await? .id; @@ -51,7 +52,7 @@ pub(crate) async fn receive_undo_like_post( websocket_id: None, }); - announce_if_community_is_local(undo, &user, context).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -59,8 +60,9 @@ pub(crate) async fn receive_undo_dislike_post( undo: Undo, dislike: &Dislike, context: &LemmyContext, + request_counter: &mut i32, ) -> Result { - let user = get_actor_as_user(dislike, context).await?; + let user = get_actor_as_user(dislike, context, request_counter).await?; let page = PageExt::from_any_base( dislike .object() @@ -70,9 +72,9 @@ pub(crate) async fn receive_undo_dislike_post( )? .context(location_info!())?; - let post = PostForm::from_apub(&page, context, None).await?; + let post = PostForm::from_apub(&page, context, None, request_counter).await?; - let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) + let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter) .await? .id; @@ -96,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post( websocket_id: None, }); - announce_if_community_is_local(undo, &user, context).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -104,6 +106,7 @@ pub(crate) async fn receive_undo_delete_post( context: &LemmyContext, undo: Undo, post: Post, + request_counter: &mut i32, ) -> Result { let deleted_post = blocking(context.pool(), move |conn| { Post::update_deleted(conn, post.id, false) @@ -124,8 +127,8 @@ pub(crate) async fn receive_undo_delete_post( websocket_id: None, }); - let user = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &user, context).await?; + let user = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &user, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -133,6 +136,7 @@ pub(crate) async fn receive_undo_remove_post( context: &LemmyContext, undo: Undo, post: Post, + request_counter: &mut i32, ) -> Result { let removed_post = blocking(context.pool(), move |conn| { Post::update_removed(conn, post.id, false) @@ -154,7 +158,7 @@ pub(crate) async fn receive_undo_remove_post( websocket_id: None, }); - let mod_ = get_actor_as_user(&undo, context).await?; - announce_if_community_is_local(undo, &mod_, context).await?; + let mod_ = get_actor_as_user(&undo, context, request_counter).await?; + announce_if_community_is_local(undo, &mod_, context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/lemmy_apub/src/activities/send/comment.rs b/lemmy_apub/src/activities/send/comment.rs index bcfd779c..185ceff2 100644 --- a/lemmy_apub/src/activities/send/comment.rs +++ b/lemmy_apub/src/activities/send/comment.rs @@ -341,7 +341,7 @@ async fn collect_non_local_mentions_and_addresses( debug!("mention actor_id: {}", actor_id); addressed_ccs.push(actor_id.to_owned().to_string().parse()?); - let mention_user = get_or_fetch_and_upsert_user(&actor_id, context).await?; + let mention_user = get_or_fetch_and_upsert_user(&actor_id, context, &mut 0).await?; let shared_inbox = mention_user.get_shared_inbox_url()?; mention_inboxes.push(shared_inbox); diff --git a/lemmy_apub/src/activities/send/community.rs b/lemmy_apub/src/activities/send/community.rs index 2f43f9c5..f8d03579 100644 --- a/lemmy_apub/src/activities/send/community.rs +++ b/lemmy_apub/src/activities/send/community.rs @@ -72,7 +72,7 @@ impl ActorType for Community { .actor()? .as_single_xsd_any_uri() .context(location_info!())?; - let user = get_or_fetch_and_upsert_user(actor_uri, context).await?; + let user = get_or_fetch_and_upsert_user(actor_uri, context, &mut 0).await?; let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?); accept diff --git a/lemmy_apub/src/fetcher.rs b/lemmy_apub/src/fetcher.rs index 908d1a5e..43466a0f 100644 --- a/lemmy_apub/src/fetcher.rs +++ b/lemmy_apub/src/fetcher.rs @@ -42,12 +42,26 @@ use url::Url; static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; +/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object +/// fetch through the search). +/// +/// Tests are passing with a value of 5, so 10 should be safe for production. +static MAX_REQUEST_NUMBER: i32 = 10; + /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, /// timeouts etc. -async fn fetch_remote_object(client: &Client, url: &Url) -> Result +async fn fetch_remote_object( + client: &Client, + url: &Url, + recursion_counter: &mut i32, +) -> Result where Response: for<'de> Deserialize<'de>, { + *recursion_counter += 1; + if *recursion_counter > MAX_REQUEST_NUMBER { + return Err(anyhow!("Maximum recursion depth reached").into()); + } check_is_apub_id_valid(&url)?; let timeout = Duration::from_secs(60); @@ -131,62 +145,70 @@ pub async fn search_by_apub_id( }; let domain = query_url.domain().context("url has no domain")?; - let response = - match fetch_remote_object::(context.client(), &query_url).await? { - SearchAcceptedObjects::Person(p) => { - let user_uri = p.inner.id(domain)?.context("person has no id")?; + let recursion_counter = &mut 0; + let response = match fetch_remote_object::( + context.client(), + &query_url, + recursion_counter, + ) + .await? + { + SearchAcceptedObjects::Person(p) => { + let user_uri = p.inner.id(domain)?.context("person has no id")?; - let user = get_or_fetch_and_upsert_user(&user_uri, context).await?; + let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?; - response.users = vec![ - blocking(context.pool(), move |conn| { - UserView::get_user_secure(conn, user.id) - }) - .await??, - ]; - - response - } - SearchAcceptedObjects::Group(g) => { - let community_uri = g.inner.id(domain)?.context("group has no id")?; - - let community = get_or_fetch_and_upsert_community(community_uri, context).await?; - - response.communities = vec![ - blocking(context.pool(), move |conn| { - CommunityView::read(conn, community.id, None) - }) - .await??, - ]; - - response - } - SearchAcceptedObjects::Page(p) => { - let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?; - - let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; - response.posts = - vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??]; - - response - } - SearchAcceptedObjects::Comment(c) => { - let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?; - - let c = blocking(context.pool(), move |conn| { - Comment::upsert(conn, &comment_form) + response.users = vec![ + blocking(context.pool(), move |conn| { + UserView::get_user_secure(conn, user.id) }) - .await??; - response.comments = vec![ - blocking(context.pool(), move |conn| { - CommentView::read(conn, c.id, None) - }) - .await??, - ]; + .await??, + ]; - response - } - }; + response + } + SearchAcceptedObjects::Group(g) => { + let community_uri = g.inner.id(domain)?.context("group has no id")?; + + let community = + get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?; + + response.communities = vec![ + blocking(context.pool(), move |conn| { + CommunityView::read(conn, community.id, None) + }) + .await??, + ]; + + response + } + SearchAcceptedObjects::Page(p) => { + let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?; + + let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; + response.posts = + vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??]; + + response + } + SearchAcceptedObjects::Comment(c) => { + let comment_form = + CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?; + + let c = blocking(context.pool(), move |conn| { + Comment::upsert(conn, &comment_form) + }) + .await??; + response.comments = vec![ + blocking(context.pool(), move |conn| { + CommentView::read(conn, c.id, None) + }) + .await??, + ]; + + response + } + }; Ok(response) } @@ -199,11 +221,12 @@ pub async fn search_by_apub_id( pub(crate) async fn get_or_fetch_and_upsert_actor( apub_id: &Url, context: &LemmyContext, + recursion_counter: &mut i32, ) -> Result, LemmyError> { - let community = get_or_fetch_and_upsert_community(apub_id, context).await; + let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; let actor: Box = match community { Ok(c) => Box::new(c), - Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context).await?), + Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?), }; Ok(actor) } @@ -215,6 +238,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor( pub(crate) async fn get_or_fetch_and_upsert_user( apub_id: &Url, context: &LemmyContext, + recursion_counter: &mut i32, ) -> Result { let apub_id_owned = apub_id.to_owned(); let user = blocking(context.pool(), move |conn| { @@ -226,9 +250,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user( // If its older than a day, re-fetch it Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => { debug!("Fetching and updating from remote user: {}", apub_id); - let person = fetch_remote_object::(context.client(), apub_id).await?; + let person = + fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; - let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?; + let mut uf = UserForm::from_apub( + &person, + context, + Some(apub_id.to_owned()), + recursion_counter, + ) + .await?; uf.last_refreshed_at = Some(naive_now()); let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??; @@ -237,9 +268,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user( Ok(u) => Ok(u), Err(NotFound {}) => { debug!("Fetching and creating remote user: {}", apub_id); - let person = fetch_remote_object::(context.client(), apub_id).await?; + let person = + fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; - let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?; + let uf = UserForm::from_apub( + &person, + context, + Some(apub_id.to_owned()), + recursion_counter, + ) + .await?; let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??; Ok(user) @@ -271,6 +309,7 @@ fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { pub(crate) async fn get_or_fetch_and_upsert_community( apub_id: &Url, context: &LemmyContext, + recursion_counter: &mut i32, ) -> Result { let apub_id_owned = apub_id.to_owned(); let community = blocking(context.pool(), move |conn| { @@ -281,12 +320,12 @@ pub(crate) async fn get_or_fetch_and_upsert_community( match community { Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { debug!("Fetching and updating from remote community: {}", apub_id); - fetch_remote_community(apub_id, context, Some(c.id)).await + fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await } Ok(c) => Ok(c), Err(NotFound {}) => { debug!("Fetching and creating remote community: {}", apub_id); - fetch_remote_community(apub_id, context, None).await + fetch_remote_community(apub_id, context, None, recursion_counter).await } Err(e) => Err(e.into()), } @@ -299,10 +338,12 @@ async fn fetch_remote_community( apub_id: &Url, context: &LemmyContext, community_id: Option, + recursion_counter: &mut i32, ) -> Result { - let group = fetch_remote_object::(context.client(), apub_id).await?; + let group = fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; - let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?; + let cf = + CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?; let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??; // Also add the community moderators too @@ -317,7 +358,7 @@ async fn fetch_remote_community( let mut creator_and_moderators = Vec::new(); for uri in creator_and_moderator_uris { - let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?; + let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?; creator_and_moderators.push(c_or_m); } @@ -340,9 +381,12 @@ async fn fetch_remote_community( } // fetch outbox (maybe make this conditional) - let outbox = - fetch_remote_object::(context.client(), &community.get_outbox_url()?) - .await?; + let outbox = fetch_remote_object::( + context.client(), + &community.get_outbox_url()?, + recursion_counter, + ) + .await?; let outbox_items = outbox.items().context(location_info!())?.clone(); let mut outbox_items = outbox_items.many().context(location_info!())?; if outbox_items.len() > 20 { @@ -353,7 +397,7 @@ async fn fetch_remote_community( // The post creator may be from a blocked instance, // if it errors, then continue - let post = match PostForm::from_apub(&page, context, None).await { + let post = match PostForm::from_apub(&page, context, None, recursion_counter).await { Ok(post) => post, Err(_) => continue, }; @@ -380,6 +424,7 @@ async fn fetch_remote_community( pub(crate) async fn get_or_fetch_and_insert_post( post_ap_id: &Url, context: &LemmyContext, + recursion_counter: &mut i32, ) -> Result { let post_ap_id_owned = post_ap_id.to_owned(); let post = blocking(context.pool(), move |conn| { @@ -391,8 +436,15 @@ pub(crate) async fn get_or_fetch_and_insert_post( Ok(p) => Ok(p), Err(NotFound {}) => { debug!("Fetching and creating remote post: {}", post_ap_id); - let post = fetch_remote_object::(context.client(), post_ap_id).await?; - let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?; + let post = + fetch_remote_object::(context.client(), post_ap_id, recursion_counter).await?; + let post_form = PostForm::from_apub( + &post, + context, + Some(post_ap_id.to_owned()), + recursion_counter, + ) + .await?; let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; @@ -409,6 +461,7 @@ pub(crate) async fn get_or_fetch_and_insert_post( pub(crate) async fn get_or_fetch_and_insert_comment( comment_ap_id: &Url, context: &LemmyContext, + recursion_counter: &mut i32, ) -> Result { let comment_ap_id_owned = comment_ap_id.to_owned(); let comment = blocking(context.pool(), move |conn| { @@ -423,9 +476,15 @@ pub(crate) async fn get_or_fetch_and_insert_comment( "Fetching and creating remote comment and its parents: {}", comment_ap_id ); - let comment = fetch_remote_object::(context.client(), comment_ap_id).await?; - let comment_form = - CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?; + let comment = + fetch_remote_object::(context.client(), comment_ap_id, recursion_counter).await?; + let comment_form = CommentForm::from_apub( + &comment, + context, + Some(comment_ap_id.to_owned()), + recursion_counter, + ) + .await?; let comment = blocking(context.pool(), move |conn| { Comment::upsert(conn, &comment_form) diff --git a/lemmy_apub/src/inbox/community_inbox.rs b/lemmy_apub/src/inbox/community_inbox.rs index 11d09972..6cef1d03 100644 --- a/lemmy_apub/src/inbox/community_inbox.rs +++ b/lemmy_apub/src/inbox/community_inbox.rs @@ -75,7 +75,8 @@ pub async fn community_inbox( ); check_is_apub_id_valid(user_uri)?; - let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?; + let request_counter = &mut 0; + let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?; verify_signature(&request, &user)?; diff --git a/lemmy_apub/src/inbox/shared_inbox.rs b/lemmy_apub/src/inbox/shared_inbox.rs index 5184197e..da26d748 100644 --- a/lemmy_apub/src/inbox/shared_inbox.rs +++ b/lemmy_apub/src/inbox/shared_inbox.rs @@ -100,20 +100,23 @@ pub async fn shared_inbox( check_is_apub_id_valid(&actor_id)?; - let actor = get_or_fetch_and_upsert_actor(&actor_id, &context).await?; + let request_counter = &mut 0; + let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?; verify_signature(&request, actor.as_ref())?; let any_base = activity.clone().into_any_base()?; let kind = activity.kind().context(location_info!())?; let res = match kind { - ValidTypes::Announce => receive_announce(&context, any_base, actor.as_ref()).await, - ValidTypes::Create => receive_create(&context, any_base, actor_id).await, - ValidTypes::Update => receive_update(&context, any_base, actor_id).await, - ValidTypes::Like => receive_like(&context, any_base, actor_id).await, - ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id).await, + ValidTypes::Announce => { + receive_announce(&context, any_base, actor.as_ref(), request_counter).await + } + ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await, + ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await, + ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await, + ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await, ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await, - ValidTypes::Delete => receive_delete(&context, any_base, actor_id).await, - ValidTypes::Undo => receive_undo(&context, any_base, actor_id).await, + ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await, + ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await, }; insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?; @@ -125,6 +128,7 @@ async fn receive_announce( context: &LemmyContext, activity: AnyBase, actor: &dyn ActorType, + request_counter: &mut i32, ) -> Result { let announce = Announce::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&announce, actor.actor_id()?, false)?; @@ -140,13 +144,13 @@ async fn receive_announce( check_is_apub_id_valid(&inner_id)?; match kind { - Some("Create") => receive_create(context, object, inner_id).await, - Some("Update") => receive_update(context, object, inner_id).await, - Some("Like") => receive_like(context, object, inner_id).await, - Some("Dislike") => receive_dislike(context, object, inner_id).await, - Some("Delete") => receive_delete(context, object, inner_id).await, + Some("Create") => receive_create(context, object, inner_id, request_counter).await, + Some("Update") => receive_update(context, object, inner_id, request_counter).await, + Some("Like") => receive_like(context, object, inner_id, request_counter).await, + Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await, + Some("Delete") => receive_delete(context, object, inner_id, request_counter).await, Some("Remove") => receive_remove(context, object, inner_id).await, - Some("Undo") => receive_undo(context, object, inner_id).await, + Some("Undo") => receive_undo(context, object, inner_id, request_counter).await, _ => receive_unhandled_activity(announce), } } @@ -155,13 +159,14 @@ async fn receive_create( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let create = Create::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&create, expected_domain, true)?; match create.object().as_single_kind_str() { - Some("Page") => receive_create_post(create, context).await, - Some("Note") => receive_create_comment(create, context).await, + Some("Page") => receive_create_post(create, context, request_counter).await, + Some("Note") => receive_create_comment(create, context, request_counter).await, _ => receive_unhandled_activity(create), } } @@ -170,13 +175,14 @@ async fn receive_update( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let update = Update::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&update, expected_domain, true)?; match update.object().as_single_kind_str() { - Some("Page") => receive_update_post(update, context).await, - Some("Note") => receive_update_comment(update, context).await, + Some("Page") => receive_update_post(update, context, request_counter).await, + Some("Note") => receive_update_comment(update, context, request_counter).await, _ => receive_unhandled_activity(update), } } @@ -185,13 +191,14 @@ async fn receive_like( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let like = Like::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&like, expected_domain, false)?; match like.object().as_single_kind_str() { - Some("Page") => receive_like_post(like, context).await, - Some("Note") => receive_like_comment(like, context).await, + Some("Page") => receive_like_post(like, context, request_counter).await, + Some("Note") => receive_like_comment(like, context, request_counter).await, _ => receive_unhandled_activity(like), } } @@ -200,6 +207,7 @@ async fn receive_dislike( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let enable_downvotes = blocking(context.pool(), move |conn| { Site::read(conn, 1).map(|s| s.enable_downvotes) @@ -213,8 +221,8 @@ async fn receive_dislike( verify_activity_domains_valid(&dislike, expected_domain, false)?; match dislike.object().as_single_kind_str() { - Some("Page") => receive_dislike_post(dislike, context).await, - Some("Note") => receive_dislike_comment(dislike, context).await, + Some("Page") => receive_dislike_post(dislike, context, request_counter).await, + Some("Note") => receive_dislike_comment(dislike, context, request_counter).await, _ => receive_unhandled_activity(dislike), } } @@ -223,6 +231,7 @@ pub async fn receive_delete( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let delete = Delete::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&delete, expected_domain, true)?; @@ -234,9 +243,13 @@ pub async fn receive_delete( .context(location_info!())?; match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p).await, - Ok(FindResults::Comment(c)) => receive_delete_comment(context, delete, c).await, - Ok(FindResults::Community(c)) => receive_delete_community(context, delete, c).await, + Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await, + Ok(FindResults::Comment(c)) => { + receive_delete_comment(context, delete, c, request_counter).await + } + Ok(FindResults::Community(c)) => { + receive_delete_community(context, delete, c, request_counter).await + } // if we dont have the object, no need to do anything Err(_) => Ok(HttpResponse::Ok().finish()), } @@ -283,15 +296,16 @@ async fn receive_undo( context: &LemmyContext, activity: AnyBase, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let undo = Undo::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?; match undo.object().as_single_kind_str() { - Some("Delete") => receive_undo_delete(context, undo, expected_domain).await, - Some("Remove") => receive_undo_remove(context, undo, expected_domain).await, - Some("Like") => receive_undo_like(context, undo, expected_domain).await, - Some("Dislike") => receive_undo_dislike(context, undo, expected_domain).await, + Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await, + Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await, + Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await, + Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await, _ => receive_unhandled_activity(undo), } } @@ -300,6 +314,7 @@ async fn receive_undo_delete( context: &LemmyContext, undo: Undo, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -311,9 +326,13 @@ async fn receive_undo_delete( .single_xsd_any_uri() .context(location_info!())?; match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p).await, - Ok(FindResults::Comment(c)) => receive_undo_delete_comment(context, undo, c).await, - Ok(FindResults::Community(c)) => receive_undo_delete_community(context, undo, c).await, + Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await, + Ok(FindResults::Comment(c)) => { + receive_undo_delete_comment(context, undo, c, request_counter).await + } + Ok(FindResults::Community(c)) => { + receive_undo_delete_community(context, undo, c, request_counter).await + } // if we dont have the object, no need to do anything Err(_) => Ok(HttpResponse::Ok().finish()), } @@ -323,6 +342,7 @@ async fn receive_undo_remove( context: &LemmyContext, undo: Undo, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -334,9 +354,13 @@ async fn receive_undo_remove( .single_xsd_any_uri() .context(location_info!())?; match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p).await, - Ok(FindResults::Comment(c)) => receive_undo_remove_comment(context, undo, c).await, - Ok(FindResults::Community(c)) => receive_undo_remove_community(context, undo, c).await, + Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await, + Ok(FindResults::Comment(c)) => { + receive_undo_remove_comment(context, undo, c, request_counter).await + } + Ok(FindResults::Community(c)) => { + receive_undo_remove_community(context, undo, c, request_counter).await + } // if we dont have the object, no need to do anything Err(_) => Ok(HttpResponse::Ok().finish()), } @@ -346,6 +370,7 @@ async fn receive_undo_like( context: &LemmyContext, undo: Undo, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -356,8 +381,8 @@ async fn receive_undo_like( .as_single_kind_str() .context(location_info!())?; match type_ { - "Note" => receive_undo_like_comment(undo, &like, context).await, - "Page" => receive_undo_like_post(undo, &like, context).await, + "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await, + "Page" => receive_undo_like_post(undo, &like, context, request_counter).await, d => Err(anyhow!("Undo Delete type {} not supported", d).into()), } } @@ -366,6 +391,7 @@ async fn receive_undo_dislike( context: &LemmyContext, undo: Undo, expected_domain: Url, + request_counter: &mut i32, ) -> Result { let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -376,8 +402,8 @@ async fn receive_undo_dislike( .as_single_kind_str() .context(location_info!())?; match type_ { - "Note" => receive_undo_dislike_comment(undo, &dislike, context).await, - "Page" => receive_undo_dislike_post(undo, &dislike, context).await, + "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await, + "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await, d => Err(anyhow!("Undo Delete type {} not supported", d).into()), } } diff --git a/lemmy_apub/src/inbox/user_inbox.rs b/lemmy_apub/src/inbox/user_inbox.rs index ebb11a56..e89aba14 100644 --- a/lemmy_apub/src/inbox/user_inbox.rs +++ b/lemmy_apub/src/inbox/user_inbox.rs @@ -79,15 +79,22 @@ pub async fn user_inbox( check_is_apub_id_valid(actor_uri)?; - let actor = get_or_fetch_and_upsert_actor(actor_uri, &context).await?; + let request_counter = &mut 0; + let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?; verify_signature(&request, actor.as_ref())?; let any_base = activity.clone().into_any_base()?; let kind = activity.kind().context(location_info!())?; let res = match kind { - ValidTypes::Accept => receive_accept(&context, any_base, actor.as_ref(), user).await, - ValidTypes::Create => receive_create_private_message(&context, any_base, actor.as_ref()).await, - ValidTypes::Update => receive_update_private_message(&context, any_base, actor.as_ref()).await, + ValidTypes::Accept => { + receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await + } + ValidTypes::Create => { + receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await + } + ValidTypes::Update => { + receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await + } ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await, ValidTypes::Undo => { receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await @@ -104,6 +111,7 @@ async fn receive_accept( activity: AnyBase, actor: &dyn ActorType, user: User_, + request_counter: &mut i32, ) -> Result { let accept = Accept::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&accept, actor.actor_id()?, false)?; @@ -120,7 +128,8 @@ async fn receive_accept( .single_xsd_any_uri() .context(location_info!())?; - let community = get_or_fetch_and_upsert_community(&community_uri, context).await?; + let community = + get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?; // Now you need to add this to the community follower let community_follower_form = CommunityFollowerForm { @@ -141,6 +150,7 @@ async fn receive_create_private_message( context: &LemmyContext, activity: AnyBase, actor: &dyn ActorType, + request_counter: &mut i32, ) -> Result { let create = Create::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&create, actor.actor_id()?, true)?; @@ -155,7 +165,7 @@ async fn receive_create_private_message( .context(location_info!())?; let private_message = - PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?)).await?; + PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?; let inserted_private_message = blocking(&context.pool(), move |conn| { PrivateMessage::create(conn, &private_message) @@ -185,6 +195,7 @@ async fn receive_update_private_message( context: &LemmyContext, activity: AnyBase, actor: &dyn ActorType, + request_counter: &mut i32, ) -> Result { let update = Update::from_any_base(activity)?.context(location_info!())?; verify_activity_domains_valid(&update, actor.actor_id()?, true)?; @@ -197,7 +208,7 @@ async fn receive_update_private_message( let note = Note::from_any_base(object)?.context(location_info!())?; let private_message_form = - PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?)).await?; + PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?; let private_message_ap_id = private_message_form .ap_id diff --git a/lemmy_apub/src/lib.rs b/lemmy_apub/src/lib.rs index c93d6477..4a1413b3 100644 --- a/lemmy_apub/src/lib.rs +++ b/lemmy_apub/src/lib.rs @@ -122,6 +122,7 @@ pub trait FromApub { apub: &Self::ApubType, context: &LemmyContext, expected_domain: Option, + request_counter: &mut i32, ) -> Result where Self: Sized; diff --git a/lemmy_apub/src/objects/comment.rs b/lemmy_apub/src/objects/comment.rs index efd2064d..ca0b0e85 100644 --- a/lemmy_apub/src/objects/comment.rs +++ b/lemmy_apub/src/objects/comment.rs @@ -89,6 +89,7 @@ impl FromApub for CommentForm { note: &Note, context: &LemmyContext, expected_domain: Option, + request_counter: &mut i32, ) -> Result { let creator_actor_id = ¬e .attributed_to() @@ -96,7 +97,7 @@ impl FromApub for CommentForm { .as_single_xsd_any_uri() .context(location_info!())?; - let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?; + let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?; let mut in_reply_tos = note .in_reply_to() @@ -109,7 +110,7 @@ impl FromApub for CommentForm { let post_ap_id = in_reply_tos.next().context(location_info!())??; // This post, or the parent comment might not yet exist on this server yet, fetch them. - let post = get_or_fetch_and_insert_post(&post_ap_id, context).await?; + let post = get_or_fetch_and_insert_post(&post_ap_id, context, request_counter).await?; // The 2nd item, if it exists, is the parent comment apub_id // For deeply nested comments, FromApub automatically gets called recursively @@ -117,7 +118,7 @@ impl FromApub for CommentForm { Some(parent_comment_uri) => { let parent_comment_ap_id = &parent_comment_uri?; let parent_comment = - get_or_fetch_and_insert_comment(&parent_comment_ap_id, context).await?; + get_or_fetch_and_insert_comment(&parent_comment_ap_id, context, request_counter).await?; Some(parent_comment.id) } diff --git a/lemmy_apub/src/objects/community.rs b/lemmy_apub/src/objects/community.rs index 0012e2bf..d697c70b 100644 --- a/lemmy_apub/src/objects/community.rs +++ b/lemmy_apub/src/objects/community.rs @@ -111,6 +111,7 @@ impl FromApub for CommunityForm { group: &GroupExt, context: &LemmyContext, expected_domain: Option, + request_counter: &mut i32, ) -> Result { let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?; let creator_uri = creator_and_moderator_uris @@ -122,7 +123,7 @@ impl FromApub for CommunityForm { .as_xsd_any_uri() .context(location_info!())?; - let creator = get_or_fetch_and_upsert_user(creator_uri, context).await?; + let creator = get_or_fetch_and_upsert_user(creator_uri, context, request_counter).await?; let name = group .inner .preferred_username() diff --git a/lemmy_apub/src/objects/post.rs b/lemmy_apub/src/objects/post.rs index 8796146d..6b42e690 100644 --- a/lemmy_apub/src/objects/post.rs +++ b/lemmy_apub/src/objects/post.rs @@ -101,6 +101,7 @@ impl FromApub for PostForm { page: &PageExt, context: &LemmyContext, expected_domain: Option, + request_counter: &mut i32, ) -> Result { let ext = &page.ext_one; let creator_actor_id = page @@ -111,7 +112,7 @@ impl FromApub for PostForm { .as_single_xsd_any_uri() .context(location_info!())?; - let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?; + let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?; let community_actor_id = page .inner @@ -121,7 +122,8 @@ impl FromApub for PostForm { .as_single_xsd_any_uri() .context(location_info!())?; - let community = get_or_fetch_and_upsert_community(community_actor_id, context).await?; + let community = + get_or_fetch_and_upsert_community(community_actor_id, context, request_counter).await?; let thumbnail_url = match &page.inner.image() { Some(any_image) => Image::from_any_base( diff --git a/lemmy_apub/src/objects/private_message.rs b/lemmy_apub/src/objects/private_message.rs index 119dfb56..64047963 100644 --- a/lemmy_apub/src/objects/private_message.rs +++ b/lemmy_apub/src/objects/private_message.rs @@ -62,6 +62,7 @@ impl FromApub for PrivateMessageForm { note: &Note, context: &LemmyContext, expected_domain: Option, + request_counter: &mut i32, ) -> Result { let creator_actor_id = note .attributed_to() @@ -70,14 +71,15 @@ impl FromApub for PrivateMessageForm { .single_xsd_any_uri() .context(location_info!())?; - let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context).await?; + let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context, request_counter).await?; let recipient_actor_id = note .to() .context(location_info!())? .clone() .single_xsd_any_uri() .context(location_info!())?; - let recipient = get_or_fetch_and_upsert_user(&recipient_actor_id, context).await?; + let recipient = + get_or_fetch_and_upsert_user(&recipient_actor_id, context, request_counter).await?; let ap_id = note.id_unchecked().context(location_info!())?.to_string(); check_is_apub_id_valid(&Url::parse(&ap_id)?)?; diff --git a/lemmy_apub/src/objects/user.rs b/lemmy_apub/src/objects/user.rs index fb8213e8..5ef1ec8e 100644 --- a/lemmy_apub/src/objects/user.rs +++ b/lemmy_apub/src/objects/user.rs @@ -76,6 +76,7 @@ impl FromApub for UserForm { person: &PersonExt, _context: &LemmyContext, expected_domain: Option, + _request_counter: &mut i32, ) -> Result { let avatar = match person.icon() { Some(any_image) => Some(