Merge pull request 'Limit amount of HTTP requests to handle activities (fixes #1221)' (#117) from request-limit into main

Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/117
This commit is contained in:
dessalines 2020-10-26 15:35:39 +00:00
commit 53c9094d46
18 changed files with 332 additions and 198 deletions

View file

@ -25,12 +25,14 @@ use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
pub(crate) async fn receive_create_comment( pub(crate) async fn receive_create_comment(
create: Create, create: Create,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?; let user = get_actor_as_user(&create, context, request_counter).await?;
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)? let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?; let comment =
CommentForm::from_apub(&note, context, Some(user.actor_id()?), request_counter).await?;
let inserted_comment = let inserted_comment =
blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??; blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
@ -71,21 +73,24 @@ pub(crate) async fn receive_create_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(create, &user, context).await?; announce_if_community_is_local(create, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_update_comment( pub(crate) async fn receive_update_comment(
update: Update, update: Update,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)? let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let user = get_actor_as_user(&update, context).await?; let user = get_actor_as_user(&update, context, request_counter).await?;
let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?; let comment =
CommentForm::from_apub(&note, context, Some(user.actor_id()?), request_counter).await?;
let original_comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) let original_comment_id =
get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -126,21 +131,22 @@ pub(crate) async fn receive_update_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(update, &user, context).await?; announce_if_community_is_local(update, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_like_comment( pub(crate) async fn receive_like_comment(
like: Like, like: Like,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let user = get_actor_as_user(&like, context).await?; let user = get_actor_as_user(&like, context, request_counter).await?;
let comment = CommentForm::from_apub(&note, context, None).await?; let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -177,13 +183,14 @@ pub(crate) async fn receive_like_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(like, &user, context).await?; announce_if_community_is_local(like, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_dislike_comment( pub(crate) async fn receive_dislike_comment(
dislike: Dislike, dislike: Dislike,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base( let note = Note::from_any_base(
dislike dislike
@ -193,11 +200,11 @@ pub(crate) async fn receive_dislike_comment(
.context(location_info!())?, .context(location_info!())?,
)? )?
.context(location_info!())?; .context(location_info!())?;
let user = get_actor_as_user(&dislike, context).await?; let user = get_actor_as_user(&dislike, context, request_counter).await?;
let comment = CommentForm::from_apub(&note, context, None).await?; let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -234,7 +241,7 @@ pub(crate) async fn receive_dislike_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(dislike, &user, context).await?; announce_if_community_is_local(dislike, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -242,6 +249,7 @@ pub(crate) async fn receive_delete_comment(
context: &LemmyContext, context: &LemmyContext,
delete: Delete, delete: Delete,
comment: Comment, comment: Comment,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| { let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, true) Comment::update_deleted(conn, comment.id, true)
@ -268,8 +276,8 @@ pub(crate) async fn receive_delete_comment(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context).await?; let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context).await?; announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -19,14 +19,15 @@ pub(crate) async fn receive_undo_like_comment(
undo: Undo, undo: Undo,
like: &Like, like: &Like,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(like, context).await?; let user = get_actor_as_user(like, context, request_counter).await?;
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let comment = CommentForm::from_apub(&note, context, None).await?; let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -56,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -64,8 +65,9 @@ pub(crate) async fn receive_undo_dislike_comment(
undo: Undo, undo: Undo,
dislike: &Dislike, dislike: &Dislike,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(dislike, context).await?; let user = get_actor_as_user(dislike, context, request_counter).await?;
let note = Note::from_any_base( let note = Note::from_any_base(
dislike dislike
.object() .object()
@ -75,9 +77,9 @@ pub(crate) async fn receive_undo_dislike_comment(
)? )?
.context(location_info!())?; .context(location_info!())?;
let comment = CommentForm::from_apub(&note, context, None).await?; let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context) let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -107,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -115,6 +117,7 @@ pub(crate) async fn receive_undo_delete_comment(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
comment: Comment, comment: Comment,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| { let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, false) Comment::update_deleted(conn, comment.id, false)
@ -142,8 +145,8 @@ pub(crate) async fn receive_undo_delete_comment(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context).await?; let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -151,6 +154,7 @@ pub(crate) async fn receive_undo_remove_comment(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
comment: Comment, comment: Comment,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| { let removed_comment = blocking(context.pool(), move |conn| {
Comment::update_removed(conn, comment.id, false) Comment::update_removed(conn, comment.id, false)
@ -178,7 +182,7 @@ pub(crate) async fn receive_undo_remove_comment(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context).await?; let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context).await?; announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -10,6 +10,7 @@ pub(crate) async fn receive_delete_community(
context: &LemmyContext, context: &LemmyContext,
delete: Delete, delete: Delete,
community: Community, community: Community,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| { let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, true) Community::update_deleted(conn, community.id, true)
@ -32,8 +33,8 @@ pub(crate) async fn receive_delete_community(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context).await?; let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context).await?; announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -63,6 +64,7 @@ pub(crate) async fn receive_remove_community(
websocket_id: None, websocket_id: None,
}); });
// TODO: this should probably also call announce_if_community_is_local()
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -70,6 +72,7 @@ pub(crate) async fn receive_undo_delete_community(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
community: Community, community: Community,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| { let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, false) Community::update_deleted(conn, community.id, false)
@ -92,8 +95,8 @@ pub(crate) async fn receive_undo_delete_community(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context).await?; let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -101,6 +104,7 @@ pub(crate) async fn receive_undo_remove_community(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
community: Community, community: Community,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let removed_community = blocking(context.pool(), move |conn| { let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, false) Community::update_removed(conn, community.id, false)
@ -124,7 +128,7 @@ pub(crate) async fn receive_undo_remove_community(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context).await?; let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context).await?; announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -41,6 +41,7 @@ async fn announce_if_community_is_local<T, Kind>(
activity: T, activity: T,
user: &User_, user: &User_,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind>, T: AsObject<Kind>,
@ -55,7 +56,9 @@ where
.context(location_info!())? .context(location_info!())?
.as_xsd_any_uri() .as_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let community = get_or_fetch_and_upsert_community(&community_uri, context).await?; // TODO: we could just read from the local db here (and ignore if the community is not found)
let community =
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
if community.local { if community.local {
community community
@ -69,13 +72,14 @@ where
pub(crate) async fn get_actor_as_user<T, A>( pub(crate) async fn get_actor_as_user<T, A>(
activity: &T, activity: &T,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<User_, LemmyError> ) -> Result<User_, LemmyError>
where where
T: AsBase<A> + ActorAndObjectRef, T: AsBase<A> + ActorAndObjectRef,
{ {
let actor = activity.actor()?; let actor = activity.actor()?;
let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?; let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?;
get_or_fetch_and_upsert_user(&user_uri, context).await get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
} }
pub(crate) enum FindResults { pub(crate) enum FindResults {

View file

@ -24,12 +24,13 @@ use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
pub(crate) async fn receive_create_post( pub(crate) async fn receive_create_post(
create: Create, create: Create,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?; let user = get_actor_as_user(&create, context, request_counter).await?;
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)? let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?; let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
// Using an upsert, since likes (which fetch the post), sometimes come in before the create // Using an upsert, since likes (which fetch the post), sometimes come in before the create
// resulting in double posts. // resulting in double posts.
@ -50,21 +51,22 @@ pub(crate) async fn receive_create_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(create, &user, context).await?; announce_if_community_is_local(create, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_update_post( pub(crate) async fn receive_update_post(
update: Update, update: Update,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&update, context).await?; let user = get_actor_as_user(&update, context, request_counter).await?;
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)? let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?; let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -87,21 +89,22 @@ pub(crate) async fn receive_update_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(update, &user, context).await?; announce_if_community_is_local(update, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_like_post( pub(crate) async fn receive_like_post(
like: Like, like: Like,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&like, context).await?; let user = get_actor_as_user(&like, context, request_counter).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?; let post = PostForm::from_apub(&page, context, None, request_counter).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -131,15 +134,16 @@ pub(crate) async fn receive_like_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(like, &user, context).await?; announce_if_community_is_local(like, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
pub(crate) async fn receive_dislike_post( pub(crate) async fn receive_dislike_post(
dislike: Dislike, dislike: Dislike,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&dislike, context).await?; let user = get_actor_as_user(&dislike, context, request_counter).await?;
let page = PageExt::from_any_base( let page = PageExt::from_any_base(
dislike dislike
.object() .object()
@ -149,9 +153,9 @@ pub(crate) async fn receive_dislike_post(
)? )?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?; let post = PostForm::from_apub(&page, context, None, request_counter).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -181,7 +185,7 @@ pub(crate) async fn receive_dislike_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(dislike, &user, context).await?; announce_if_community_is_local(dislike, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -189,6 +193,7 @@ pub(crate) async fn receive_delete_post(
context: &LemmyContext, context: &LemmyContext,
delete: Delete, delete: Delete,
post: Post, post: Post,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| { let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, true) Post::update_deleted(conn, post.id, true)
@ -209,8 +214,8 @@ pub(crate) async fn receive_delete_post(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context).await?; let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context).await?; announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -20,14 +20,15 @@ pub(crate) async fn receive_undo_like_post(
undo: Undo, undo: Undo,
like: &Like, like: &Like,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(like, context).await?; let user = get_actor_as_user(like, context, request_counter).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?; let post = PostForm::from_apub(&page, context, None, request_counter).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -51,7 +52,7 @@ pub(crate) async fn receive_undo_like_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -59,8 +60,9 @@ pub(crate) async fn receive_undo_dislike_post(
undo: Undo, undo: Undo,
dislike: &Dislike, dislike: &Dislike,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(dislike, context).await?; let user = get_actor_as_user(dislike, context, request_counter).await?;
let page = PageExt::from_any_base( let page = PageExt::from_any_base(
dislike dislike
.object() .object()
@ -70,9 +72,9 @@ pub(crate) async fn receive_undo_dislike_post(
)? )?
.context(location_info!())?; .context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?; let post = PostForm::from_apub(&page, context, None, request_counter).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context) let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
.await? .await?
.id; .id;
@ -96,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -104,6 +106,7 @@ pub(crate) async fn receive_undo_delete_post(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
post: Post, post: Post,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| { let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, false) Post::update_deleted(conn, post.id, false)
@ -124,8 +127,8 @@ pub(crate) async fn receive_undo_delete_post(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context).await?; let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context).await?; announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -133,6 +136,7 @@ pub(crate) async fn receive_undo_remove_post(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
post: Post, post: Post,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let removed_post = blocking(context.pool(), move |conn| { let removed_post = blocking(context.pool(), move |conn| {
Post::update_removed(conn, post.id, false) Post::update_removed(conn, post.id, false)
@ -154,7 +158,7 @@ pub(crate) async fn receive_undo_remove_post(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context).await?; let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context).await?; announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -341,7 +341,7 @@ async fn collect_non_local_mentions_and_addresses(
debug!("mention actor_id: {}", actor_id); debug!("mention actor_id: {}", actor_id);
addressed_ccs.push(actor_id.to_owned().to_string().parse()?); addressed_ccs.push(actor_id.to_owned().to_string().parse()?);
let mention_user = get_or_fetch_and_upsert_user(&actor_id, context).await?; let mention_user = get_or_fetch_and_upsert_user(&actor_id, context, &mut 0).await?;
let shared_inbox = mention_user.get_shared_inbox_url()?; let shared_inbox = mention_user.get_shared_inbox_url()?;
mention_inboxes.push(shared_inbox); mention_inboxes.push(shared_inbox);

View file

@ -72,7 +72,7 @@ impl ActorType for Community {
.actor()? .actor()?
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let user = get_or_fetch_and_upsert_user(actor_uri, context).await?; let user = get_or_fetch_and_upsert_user(actor_uri, context, &mut 0).await?;
let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?); let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?);
accept accept

View file

@ -42,12 +42,26 @@ use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
/// fetch through the search).
///
/// Tests are passing with a value of 5, so 10 should be safe for production.
static MAX_REQUEST_NUMBER: i32 = 10;
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc. /// timeouts etc.
async fn fetch_remote_object<Response>(client: &Client, url: &Url) -> Result<Response, LemmyError> async fn fetch_remote_object<Response>(
client: &Client,
url: &Url,
recursion_counter: &mut i32,
) -> Result<Response, LemmyError>
where where
Response: for<'de> Deserialize<'de>, Response: for<'de> Deserialize<'de>,
{ {
*recursion_counter += 1;
if *recursion_counter > MAX_REQUEST_NUMBER {
return Err(anyhow!("Maximum recursion depth reached").into());
}
check_is_apub_id_valid(&url)?; check_is_apub_id_valid(&url)?;
let timeout = Duration::from_secs(60); let timeout = Duration::from_secs(60);
@ -131,12 +145,18 @@ pub async fn search_by_apub_id(
}; };
let domain = query_url.domain().context("url has no domain")?; let domain = query_url.domain().context("url has no domain")?;
let response = let recursion_counter = &mut 0;
match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? { let response = match fetch_remote_object::<SearchAcceptedObjects>(
context.client(),
&query_url,
recursion_counter,
)
.await?
{
SearchAcceptedObjects::Person(p) => { SearchAcceptedObjects::Person(p) => {
let user_uri = p.inner.id(domain)?.context("person has no id")?; let user_uri = p.inner.id(domain)?.context("person has no id")?;
let user = get_or_fetch_and_upsert_user(&user_uri, context).await?; let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
response.users = vec![ response.users = vec![
blocking(context.pool(), move |conn| { blocking(context.pool(), move |conn| {
@ -150,7 +170,8 @@ pub async fn search_by_apub_id(
SearchAcceptedObjects::Group(g) => { SearchAcceptedObjects::Group(g) => {
let community_uri = g.inner.id(domain)?.context("group has no id")?; let community_uri = g.inner.id(domain)?.context("group has no id")?;
let community = get_or_fetch_and_upsert_community(community_uri, context).await?; let community =
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
response.communities = vec![ response.communities = vec![
blocking(context.pool(), move |conn| { blocking(context.pool(), move |conn| {
@ -162,7 +183,7 @@ pub async fn search_by_apub_id(
response response
} }
SearchAcceptedObjects::Page(p) => { SearchAcceptedObjects::Page(p) => {
let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?; let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
response.posts = response.posts =
@ -171,7 +192,8 @@ pub async fn search_by_apub_id(
response response
} }
SearchAcceptedObjects::Comment(c) => { SearchAcceptedObjects::Comment(c) => {
let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?; let comment_form =
CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
let c = blocking(context.pool(), move |conn| { let c = blocking(context.pool(), move |conn| {
Comment::upsert(conn, &comment_form) Comment::upsert(conn, &comment_form)
@ -199,11 +221,12 @@ pub async fn search_by_apub_id(
pub(crate) async fn get_or_fetch_and_upsert_actor( pub(crate) async fn get_or_fetch_and_upsert_actor(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Box<dyn ActorType>, LemmyError> { ) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context).await; let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
let actor: Box<dyn ActorType> = match community { let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c), Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context).await?), Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
}; };
Ok(actor) Ok(actor)
} }
@ -215,6 +238,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor(
pub(crate) async fn get_or_fetch_and_upsert_user( pub(crate) async fn get_or_fetch_and_upsert_user(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<User_, LemmyError> { ) -> Result<User_, LemmyError> {
let apub_id_owned = apub_id.to_owned(); let apub_id_owned = apub_id.to_owned();
let user = blocking(context.pool(), move |conn| { let user = blocking(context.pool(), move |conn| {
@ -226,9 +250,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
// If its older than a day, re-fetch it // If its older than a day, re-fetch it
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => { Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
debug!("Fetching and updating from remote user: {}", apub_id); debug!("Fetching and updating from remote user: {}", apub_id);
let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?; let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?; let mut uf = UserForm::from_apub(
&person,
context,
Some(apub_id.to_owned()),
recursion_counter,
)
.await?;
uf.last_refreshed_at = Some(naive_now()); uf.last_refreshed_at = Some(naive_now());
let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??; let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
@ -237,9 +268,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
Ok(u) => Ok(u), Ok(u) => Ok(u),
Err(NotFound {}) => { Err(NotFound {}) => {
debug!("Fetching and creating remote user: {}", apub_id); debug!("Fetching and creating remote user: {}", apub_id);
let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?; let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?; let uf = UserForm::from_apub(
&person,
context,
Some(apub_id.to_owned()),
recursion_counter,
)
.await?;
let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??; let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
Ok(user) Ok(user)
@ -271,6 +309,7 @@ fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
pub(crate) async fn get_or_fetch_and_upsert_community( pub(crate) async fn get_or_fetch_and_upsert_community(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> { ) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned(); let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| { let community = blocking(context.pool(), move |conn| {
@ -281,12 +320,12 @@ pub(crate) async fn get_or_fetch_and_upsert_community(
match community { match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id); debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c.id)).await fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await
} }
Ok(c) => Ok(c), Ok(c) => Ok(c),
Err(NotFound {}) => { Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id); debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None).await fetch_remote_community(apub_id, context, None, recursion_counter).await
} }
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
@ -299,10 +338,12 @@ async fn fetch_remote_community(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
community_id: Option<i32>, community_id: Option<i32>,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> { ) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?; let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await?;
let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?; let cf =
CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??; let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
// Also add the community moderators too // Also add the community moderators too
@ -317,7 +358,7 @@ async fn fetch_remote_community(
let mut creator_and_moderators = Vec::new(); let mut creator_and_moderators = Vec::new();
for uri in creator_and_moderator_uris { for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?; let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
creator_and_moderators.push(c_or_m); creator_and_moderators.push(c_or_m);
} }
@ -340,8 +381,11 @@ async fn fetch_remote_community(
} }
// fetch outbox (maybe make this conditional) // fetch outbox (maybe make this conditional)
let outbox = let outbox = fetch_remote_object::<OrderedCollection>(
fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?) context.client(),
&community.get_outbox_url()?,
recursion_counter,
)
.await?; .await?;
let outbox_items = outbox.items().context(location_info!())?.clone(); let outbox_items = outbox.items().context(location_info!())?.clone();
let mut outbox_items = outbox_items.many().context(location_info!())?; let mut outbox_items = outbox_items.many().context(location_info!())?;
@ -353,7 +397,7 @@ async fn fetch_remote_community(
// The post creator may be from a blocked instance, // The post creator may be from a blocked instance,
// if it errors, then continue // if it errors, then continue
let post = match PostForm::from_apub(&page, context, None).await { let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
Ok(post) => post, Ok(post) => post,
Err(_) => continue, Err(_) => continue,
}; };
@ -380,6 +424,7 @@ async fn fetch_remote_community(
pub(crate) async fn get_or_fetch_and_insert_post( pub(crate) async fn get_or_fetch_and_insert_post(
post_ap_id: &Url, post_ap_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Post, LemmyError> { ) -> Result<Post, LemmyError> {
let post_ap_id_owned = post_ap_id.to_owned(); let post_ap_id_owned = post_ap_id.to_owned();
let post = blocking(context.pool(), move |conn| { let post = blocking(context.pool(), move |conn| {
@ -391,8 +436,15 @@ pub(crate) async fn get_or_fetch_and_insert_post(
Ok(p) => Ok(p), Ok(p) => Ok(p),
Err(NotFound {}) => { Err(NotFound {}) => {
debug!("Fetching and creating remote post: {}", post_ap_id); debug!("Fetching and creating remote post: {}", post_ap_id);
let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?; let post =
let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?; fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
let post_form = PostForm::from_apub(
&post,
context,
Some(post_ap_id.to_owned()),
recursion_counter,
)
.await?;
let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
@ -409,6 +461,7 @@ pub(crate) async fn get_or_fetch_and_insert_post(
pub(crate) async fn get_or_fetch_and_insert_comment( pub(crate) async fn get_or_fetch_and_insert_comment(
comment_ap_id: &Url, comment_ap_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Comment, LemmyError> { ) -> Result<Comment, LemmyError> {
let comment_ap_id_owned = comment_ap_id.to_owned(); let comment_ap_id_owned = comment_ap_id.to_owned();
let comment = blocking(context.pool(), move |conn| { let comment = blocking(context.pool(), move |conn| {
@ -423,9 +476,15 @@ pub(crate) async fn get_or_fetch_and_insert_comment(
"Fetching and creating remote comment and its parents: {}", "Fetching and creating remote comment and its parents: {}",
comment_ap_id comment_ap_id
); );
let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?; let comment =
let comment_form = fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?; let comment_form = CommentForm::from_apub(
&comment,
context,
Some(comment_ap_id.to_owned()),
recursion_counter,
)
.await?;
let comment = blocking(context.pool(), move |conn| { let comment = blocking(context.pool(), move |conn| {
Comment::upsert(conn, &comment_form) Comment::upsert(conn, &comment_form)

View file

@ -75,7 +75,8 @@ pub async fn community_inbox(
); );
check_is_apub_id_valid(user_uri)?; check_is_apub_id_valid(user_uri)?;
let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?; let request_counter = &mut 0;
let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
verify_signature(&request, &user)?; verify_signature(&request, &user)?;

View file

@ -100,20 +100,23 @@ pub async fn shared_inbox(
check_is_apub_id_valid(&actor_id)?; check_is_apub_id_valid(&actor_id)?;
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context).await?; let request_counter = &mut 0;
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?; verify_signature(&request, actor.as_ref())?;
let any_base = activity.clone().into_any_base()?; let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?; let kind = activity.kind().context(location_info!())?;
let res = match kind { let res = match kind {
ValidTypes::Announce => receive_announce(&context, any_base, actor.as_ref()).await, ValidTypes::Announce => {
ValidTypes::Create => receive_create(&context, any_base, actor_id).await, receive_announce(&context, any_base, actor.as_ref(), request_counter).await
ValidTypes::Update => receive_update(&context, any_base, actor_id).await, }
ValidTypes::Like => receive_like(&context, any_base, actor_id).await, ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id).await, ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await, ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
ValidTypes::Delete => receive_delete(&context, any_base, actor_id).await, ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
ValidTypes::Undo => receive_undo(&context, any_base, actor_id).await, ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
}; };
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?; insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
@ -125,6 +128,7 @@ async fn receive_announce(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
actor: &dyn ActorType, actor: &dyn ActorType,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let announce = Announce::from_any_base(activity)?.context(location_info!())?; let announce = Announce::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&announce, actor.actor_id()?, false)?; verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
@ -140,13 +144,13 @@ async fn receive_announce(
check_is_apub_id_valid(&inner_id)?; check_is_apub_id_valid(&inner_id)?;
match kind { match kind {
Some("Create") => receive_create(context, object, inner_id).await, Some("Create") => receive_create(context, object, inner_id, request_counter).await,
Some("Update") => receive_update(context, object, inner_id).await, Some("Update") => receive_update(context, object, inner_id, request_counter).await,
Some("Like") => receive_like(context, object, inner_id).await, Some("Like") => receive_like(context, object, inner_id, request_counter).await,
Some("Dislike") => receive_dislike(context, object, inner_id).await, Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
Some("Delete") => receive_delete(context, object, inner_id).await, Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
Some("Remove") => receive_remove(context, object, inner_id).await, Some("Remove") => receive_remove(context, object, inner_id).await,
Some("Undo") => receive_undo(context, object, inner_id).await, Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
_ => receive_unhandled_activity(announce), _ => receive_unhandled_activity(announce),
} }
} }
@ -155,13 +159,14 @@ async fn receive_create(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let create = Create::from_any_base(activity)?.context(location_info!())?; let create = Create::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&create, expected_domain, true)?; verify_activity_domains_valid(&create, expected_domain, true)?;
match create.object().as_single_kind_str() { match create.object().as_single_kind_str() {
Some("Page") => receive_create_post(create, context).await, Some("Page") => receive_create_post(create, context, request_counter).await,
Some("Note") => receive_create_comment(create, context).await, Some("Note") => receive_create_comment(create, context, request_counter).await,
_ => receive_unhandled_activity(create), _ => receive_unhandled_activity(create),
} }
} }
@ -170,13 +175,14 @@ async fn receive_update(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let update = Update::from_any_base(activity)?.context(location_info!())?; let update = Update::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&update, expected_domain, true)?; verify_activity_domains_valid(&update, expected_domain, true)?;
match update.object().as_single_kind_str() { match update.object().as_single_kind_str() {
Some("Page") => receive_update_post(update, context).await, Some("Page") => receive_update_post(update, context, request_counter).await,
Some("Note") => receive_update_comment(update, context).await, Some("Note") => receive_update_comment(update, context, request_counter).await,
_ => receive_unhandled_activity(update), _ => receive_unhandled_activity(update),
} }
} }
@ -185,13 +191,14 @@ async fn receive_like(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let like = Like::from_any_base(activity)?.context(location_info!())?; let like = Like::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&like, expected_domain, false)?; verify_activity_domains_valid(&like, expected_domain, false)?;
match like.object().as_single_kind_str() { match like.object().as_single_kind_str() {
Some("Page") => receive_like_post(like, context).await, Some("Page") => receive_like_post(like, context, request_counter).await,
Some("Note") => receive_like_comment(like, context).await, Some("Note") => receive_like_comment(like, context, request_counter).await,
_ => receive_unhandled_activity(like), _ => receive_unhandled_activity(like),
} }
} }
@ -200,6 +207,7 @@ async fn receive_dislike(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let enable_downvotes = blocking(context.pool(), move |conn| { let enable_downvotes = blocking(context.pool(), move |conn| {
Site::read(conn, 1).map(|s| s.enable_downvotes) Site::read(conn, 1).map(|s| s.enable_downvotes)
@ -213,8 +221,8 @@ async fn receive_dislike(
verify_activity_domains_valid(&dislike, expected_domain, false)?; verify_activity_domains_valid(&dislike, expected_domain, false)?;
match dislike.object().as_single_kind_str() { match dislike.object().as_single_kind_str() {
Some("Page") => receive_dislike_post(dislike, context).await, Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
Some("Note") => receive_dislike_comment(dislike, context).await, Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
_ => receive_unhandled_activity(dislike), _ => receive_unhandled_activity(dislike),
} }
} }
@ -223,6 +231,7 @@ pub async fn receive_delete(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let delete = Delete::from_any_base(activity)?.context(location_info!())?; let delete = Delete::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&delete, expected_domain, true)?; verify_activity_domains_valid(&delete, expected_domain, true)?;
@ -234,9 +243,13 @@ pub async fn receive_delete(
.context(location_info!())?; .context(location_info!())?;
match find_by_id(context, object).await { match find_by_id(context, object).await {
Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p).await, Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
Ok(FindResults::Comment(c)) => receive_delete_comment(context, delete, c).await, Ok(FindResults::Comment(c)) => {
Ok(FindResults::Community(c)) => receive_delete_community(context, delete, c).await, receive_delete_comment(context, delete, c, request_counter).await
}
Ok(FindResults::Community(c)) => {
receive_delete_community(context, delete, c, request_counter).await
}
// if we dont have the object, no need to do anything // if we dont have the object, no need to do anything
Err(_) => Ok(HttpResponse::Ok().finish()), Err(_) => Ok(HttpResponse::Ok().finish()),
} }
@ -283,15 +296,16 @@ async fn receive_undo(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let undo = Undo::from_any_base(activity)?.context(location_info!())?; let undo = Undo::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?; verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
match undo.object().as_single_kind_str() { match undo.object().as_single_kind_str() {
Some("Delete") => receive_undo_delete(context, undo, expected_domain).await, Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
Some("Remove") => receive_undo_remove(context, undo, expected_domain).await, Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
Some("Like") => receive_undo_like(context, undo, expected_domain).await, Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
Some("Dislike") => receive_undo_dislike(context, undo, expected_domain).await, Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
_ => receive_unhandled_activity(undo), _ => receive_unhandled_activity(undo),
} }
} }
@ -300,6 +314,7 @@ async fn receive_undo_delete(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)? let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
@ -311,9 +326,13 @@ async fn receive_undo_delete(
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
match find_by_id(context, object).await { match find_by_id(context, object).await {
Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p).await, Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
Ok(FindResults::Comment(c)) => receive_undo_delete_comment(context, undo, c).await, Ok(FindResults::Comment(c)) => {
Ok(FindResults::Community(c)) => receive_undo_delete_community(context, undo, c).await, receive_undo_delete_comment(context, undo, c, request_counter).await
}
Ok(FindResults::Community(c)) => {
receive_undo_delete_community(context, undo, c, request_counter).await
}
// if we dont have the object, no need to do anything // if we dont have the object, no need to do anything
Err(_) => Ok(HttpResponse::Ok().finish()), Err(_) => Ok(HttpResponse::Ok().finish()),
} }
@ -323,6 +342,7 @@ async fn receive_undo_remove(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)? let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
@ -334,9 +354,13 @@ async fn receive_undo_remove(
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
match find_by_id(context, object).await { match find_by_id(context, object).await {
Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p).await, Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
Ok(FindResults::Comment(c)) => receive_undo_remove_comment(context, undo, c).await, Ok(FindResults::Comment(c)) => {
Ok(FindResults::Community(c)) => receive_undo_remove_community(context, undo, c).await, receive_undo_remove_comment(context, undo, c, request_counter).await
}
Ok(FindResults::Community(c)) => {
receive_undo_remove_community(context, undo, c, request_counter).await
}
// if we dont have the object, no need to do anything // if we dont have the object, no need to do anything
Err(_) => Ok(HttpResponse::Ok().finish()), Err(_) => Ok(HttpResponse::Ok().finish()),
} }
@ -346,6 +370,7 @@ async fn receive_undo_like(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)? let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
@ -356,8 +381,8 @@ async fn receive_undo_like(
.as_single_kind_str() .as_single_kind_str()
.context(location_info!())?; .context(location_info!())?;
match type_ { match type_ {
"Note" => receive_undo_like_comment(undo, &like, context).await, "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
"Page" => receive_undo_like_post(undo, &like, context).await, "Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
d => Err(anyhow!("Undo Delete type {} not supported", d).into()), d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
} }
} }
@ -366,6 +391,7 @@ async fn receive_undo_dislike(
context: &LemmyContext, context: &LemmyContext,
undo: Undo, undo: Undo,
expected_domain: Url, expected_domain: Url,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)? let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?; .context(location_info!())?;
@ -376,8 +402,8 @@ async fn receive_undo_dislike(
.as_single_kind_str() .as_single_kind_str()
.context(location_info!())?; .context(location_info!())?;
match type_ { match type_ {
"Note" => receive_undo_dislike_comment(undo, &dislike, context).await, "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
"Page" => receive_undo_dislike_post(undo, &dislike, context).await, "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
d => Err(anyhow!("Undo Delete type {} not supported", d).into()), d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
} }
} }

View file

@ -79,15 +79,22 @@ pub async fn user_inbox(
check_is_apub_id_valid(actor_uri)?; check_is_apub_id_valid(actor_uri)?;
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context).await?; let request_counter = &mut 0;
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?; verify_signature(&request, actor.as_ref())?;
let any_base = activity.clone().into_any_base()?; let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?; let kind = activity.kind().context(location_info!())?;
let res = match kind { let res = match kind {
ValidTypes::Accept => receive_accept(&context, any_base, actor.as_ref(), user).await, ValidTypes::Accept => {
ValidTypes::Create => receive_create_private_message(&context, any_base, actor.as_ref()).await, receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
ValidTypes::Update => receive_update_private_message(&context, any_base, actor.as_ref()).await, }
ValidTypes::Create => {
receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
}
ValidTypes::Update => {
receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
}
ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await, ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
ValidTypes::Undo => { ValidTypes::Undo => {
receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
@ -104,6 +111,7 @@ async fn receive_accept(
activity: AnyBase, activity: AnyBase,
actor: &dyn ActorType, actor: &dyn ActorType,
user: User_, user: User_,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let accept = Accept::from_any_base(activity)?.context(location_info!())?; let accept = Accept::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&accept, actor.actor_id()?, false)?; verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
@ -120,7 +128,8 @@ async fn receive_accept(
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let community = get_or_fetch_and_upsert_community(&community_uri, context).await?; let community =
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
// Now you need to add this to the community follower // Now you need to add this to the community follower
let community_follower_form = CommunityFollowerForm { let community_follower_form = CommunityFollowerForm {
@ -141,6 +150,7 @@ async fn receive_create_private_message(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
actor: &dyn ActorType, actor: &dyn ActorType,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let create = Create::from_any_base(activity)?.context(location_info!())?; let create = Create::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&create, actor.actor_id()?, true)?; verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
@ -155,7 +165,7 @@ async fn receive_create_private_message(
.context(location_info!())?; .context(location_info!())?;
let private_message = let private_message =
PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?)).await?; PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
let inserted_private_message = blocking(&context.pool(), move |conn| { let inserted_private_message = blocking(&context.pool(), move |conn| {
PrivateMessage::create(conn, &private_message) PrivateMessage::create(conn, &private_message)
@ -185,6 +195,7 @@ async fn receive_update_private_message(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
actor: &dyn ActorType, actor: &dyn ActorType,
request_counter: &mut i32,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let update = Update::from_any_base(activity)?.context(location_info!())?; let update = Update::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&update, actor.actor_id()?, true)?; verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
@ -197,7 +208,7 @@ async fn receive_update_private_message(
let note = Note::from_any_base(object)?.context(location_info!())?; let note = Note::from_any_base(object)?.context(location_info!())?;
let private_message_form = let private_message_form =
PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?)).await?; PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
let private_message_ap_id = private_message_form let private_message_ap_id = private_message_form
.ap_id .ap_id

View file

@ -129,6 +129,7 @@ pub trait FromApub {
apub: &Self::ApubType, apub: &Self::ApubType,
context: &LemmyContext, context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
request_counter: &mut i32,
) -> Result<Self, LemmyError> ) -> Result<Self, LemmyError>
where where
Self: Sized; Self: Sized;

View file

@ -89,6 +89,7 @@ impl FromApub for CommentForm {
note: &Note, note: &Note,
context: &LemmyContext, context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
request_counter: &mut i32,
) -> Result<CommentForm, LemmyError> { ) -> Result<CommentForm, LemmyError> {
let creator_actor_id = &note let creator_actor_id = &note
.attributed_to() .attributed_to()
@ -96,7 +97,7 @@ impl FromApub for CommentForm {
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?; let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
let mut in_reply_tos = note let mut in_reply_tos = note
.in_reply_to() .in_reply_to()
@ -109,7 +110,7 @@ impl FromApub for CommentForm {
let post_ap_id = in_reply_tos.next().context(location_info!())??; let post_ap_id = in_reply_tos.next().context(location_info!())??;
// This post, or the parent comment might not yet exist on this server yet, fetch them. // This post, or the parent comment might not yet exist on this server yet, fetch them.
let post = get_or_fetch_and_insert_post(&post_ap_id, context).await?; let post = get_or_fetch_and_insert_post(&post_ap_id, context, request_counter).await?;
// The 2nd item, if it exists, is the parent comment apub_id // The 2nd item, if it exists, is the parent comment apub_id
// For deeply nested comments, FromApub automatically gets called recursively // For deeply nested comments, FromApub automatically gets called recursively
@ -117,7 +118,7 @@ impl FromApub for CommentForm {
Some(parent_comment_uri) => { Some(parent_comment_uri) => {
let parent_comment_ap_id = &parent_comment_uri?; let parent_comment_ap_id = &parent_comment_uri?;
let parent_comment = let parent_comment =
get_or_fetch_and_insert_comment(&parent_comment_ap_id, context).await?; get_or_fetch_and_insert_comment(&parent_comment_ap_id, context, request_counter).await?;
Some(parent_comment.id) Some(parent_comment.id)
} }

View file

@ -111,6 +111,7 @@ impl FromApub for CommunityForm {
group: &GroupExt, group: &GroupExt,
context: &LemmyContext, context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
request_counter: &mut i32,
) -> Result<Self, LemmyError> { ) -> Result<Self, LemmyError> {
let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?; let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?;
let creator_uri = creator_and_moderator_uris let creator_uri = creator_and_moderator_uris
@ -122,7 +123,7 @@ impl FromApub for CommunityForm {
.as_xsd_any_uri() .as_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let creator = get_or_fetch_and_upsert_user(creator_uri, context).await?; let creator = get_or_fetch_and_upsert_user(creator_uri, context, request_counter).await?;
let name = group let name = group
.inner .inner
.preferred_username() .preferred_username()

View file

@ -101,6 +101,7 @@ impl FromApub for PostForm {
page: &PageExt, page: &PageExt,
context: &LemmyContext, context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
request_counter: &mut i32,
) -> Result<PostForm, LemmyError> { ) -> Result<PostForm, LemmyError> {
let ext = &page.ext_one; let ext = &page.ext_one;
let creator_actor_id = page let creator_actor_id = page
@ -111,7 +112,7 @@ impl FromApub for PostForm {
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?; let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
let community_actor_id = page let community_actor_id = page
.inner .inner
@ -121,7 +122,8 @@ impl FromApub for PostForm {
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let community = get_or_fetch_and_upsert_community(community_actor_id, context).await?; let community =
get_or_fetch_and_upsert_community(community_actor_id, context, request_counter).await?;
let thumbnail_url = match &page.inner.image() { let thumbnail_url = match &page.inner.image() {
Some(any_image) => Image::from_any_base( Some(any_image) => Image::from_any_base(

View file

@ -62,6 +62,7 @@ impl FromApub for PrivateMessageForm {
note: &Note, note: &Note,
context: &LemmyContext, context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
request_counter: &mut i32,
) -> Result<PrivateMessageForm, LemmyError> { ) -> Result<PrivateMessageForm, LemmyError> {
let creator_actor_id = note let creator_actor_id = note
.attributed_to() .attributed_to()
@ -70,14 +71,15 @@ impl FromApub for PrivateMessageForm {
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context).await?; let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context, request_counter).await?;
let recipient_actor_id = note let recipient_actor_id = note
.to() .to()
.context(location_info!())? .context(location_info!())?
.clone() .clone()
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let recipient = get_or_fetch_and_upsert_user(&recipient_actor_id, context).await?; let recipient =
get_or_fetch_and_upsert_user(&recipient_actor_id, context, request_counter).await?;
let ap_id = note.id_unchecked().context(location_info!())?.to_string(); let ap_id = note.id_unchecked().context(location_info!())?.to_string();
check_is_apub_id_valid(&Url::parse(&ap_id)?)?; check_is_apub_id_valid(&Url::parse(&ap_id)?)?;

View file

@ -76,6 +76,7 @@ impl FromApub for UserForm {
person: &PersonExt, person: &PersonExt,
_context: &LemmyContext, _context: &LemmyContext,
expected_domain: Option<Url>, expected_domain: Option<Url>,
_request_counter: &mut i32,
) -> Result<Self, LemmyError> { ) -> Result<Self, LemmyError> {
let avatar = match person.icon() { let avatar = match person.icon() {
Some(any_image) => Some( Some(any_image) => Some(