Limit amount of HTTP requests to handle activities (fixes #1221)
This commit is contained in:
parent
1a3b96b054
commit
3d5647b16f
18 changed files with 332 additions and 198 deletions
|
@ -25,12 +25,14 @@ use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
|
||||||
pub(crate) async fn receive_create_comment(
|
pub(crate) async fn receive_create_comment(
|
||||||
create: Create,
|
create: Create,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(&create, context).await?;
|
let user = get_actor_as_user(&create, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, Some(user.actor_id()?)).await?;
|
let comment =
|
||||||
|
CommentForm::from_apub(¬e, context, Some(user.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
let inserted_comment =
|
let inserted_comment =
|
||||||
blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
|
blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
|
||||||
|
@ -71,21 +73,24 @@ pub(crate) async fn receive_create_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(create, &user, context).await?;
|
announce_if_community_is_local(create, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_update_comment(
|
pub(crate) async fn receive_update_comment(
|
||||||
update: Update,
|
update: Update,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_actor_as_user(&update, context).await?;
|
let user = get_actor_as_user(&update, context, request_counter).await?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, Some(user.actor_id()?)).await?;
|
let comment =
|
||||||
|
CommentForm::from_apub(¬e, context, Some(user.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
let original_comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
|
let original_comment_id =
|
||||||
|
get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -126,21 +131,22 @@ pub(crate) async fn receive_update_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(update, &user, context).await?;
|
announce_if_community_is_local(update, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_like_comment(
|
pub(crate) async fn receive_like_comment(
|
||||||
like: Like,
|
like: Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_actor_as_user(&like, context).await?;
|
let user = get_actor_as_user(&like, context, request_counter).await?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, None).await?;
|
let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?;
|
||||||
|
|
||||||
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
|
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -177,13 +183,14 @@ pub(crate) async fn receive_like_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(like, &user, context).await?;
|
announce_if_community_is_local(like, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_dislike_comment(
|
pub(crate) async fn receive_dislike_comment(
|
||||||
dislike: Dislike,
|
dislike: Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let note = Note::from_any_base(
|
let note = Note::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
|
@ -193,11 +200,11 @@ pub(crate) async fn receive_dislike_comment(
|
||||||
.context(location_info!())?,
|
.context(location_info!())?,
|
||||||
)?
|
)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_actor_as_user(&dislike, context).await?;
|
let user = get_actor_as_user(&dislike, context, request_counter).await?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, None).await?;
|
let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?;
|
||||||
|
|
||||||
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
|
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -234,7 +241,7 @@ pub(crate) async fn receive_dislike_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(dislike, &user, context).await?;
|
announce_if_community_is_local(dislike, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,6 +249,7 @@ pub(crate) async fn receive_delete_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
delete: Delete,
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_comment = blocking(context.pool(), move |conn| {
|
let deleted_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_deleted(conn, comment.id, true)
|
Comment::update_deleted(conn, comment.id, true)
|
||||||
|
@ -268,8 +276,8 @@ pub(crate) async fn receive_delete_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&delete, context).await?;
|
let user = get_actor_as_user(&delete, context, request_counter).await?;
|
||||||
announce_if_community_is_local(delete, &user, context).await?;
|
announce_if_community_is_local(delete, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,14 +19,15 @@ pub(crate) async fn receive_undo_like_comment(
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
like: &Like,
|
like: &Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(like, context).await?;
|
let user = get_actor_as_user(like, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, None).await?;
|
let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?;
|
||||||
|
|
||||||
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
|
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -56,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,8 +65,9 @@ pub(crate) async fn receive_undo_dislike_comment(
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
dislike: &Dislike,
|
dislike: &Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(dislike, context).await?;
|
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(
|
let note = Note::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
.object()
|
.object()
|
||||||
|
@ -75,9 +77,9 @@ pub(crate) async fn receive_undo_dislike_comment(
|
||||||
)?
|
)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let comment = CommentForm::from_apub(¬e, context, None).await?;
|
let comment = CommentForm::from_apub(¬e, context, None, request_counter).await?;
|
||||||
|
|
||||||
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
|
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -107,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,6 +117,7 @@ pub(crate) async fn receive_undo_delete_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_comment = blocking(context.pool(), move |conn| {
|
let deleted_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_deleted(conn, comment.id, false)
|
Comment::update_deleted(conn, comment.id, false)
|
||||||
|
@ -142,8 +145,8 @@ pub(crate) async fn receive_undo_delete_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&undo, context).await?;
|
let user = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,6 +154,7 @@ pub(crate) async fn receive_undo_remove_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let removed_comment = blocking(context.pool(), move |conn| {
|
let removed_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_removed(conn, comment.id, false)
|
Comment::update_removed(conn, comment.id, false)
|
||||||
|
@ -178,7 +182,7 @@ pub(crate) async fn receive_undo_remove_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mod_ = get_actor_as_user(&undo, context).await?;
|
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &mod_, context).await?;
|
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ pub(crate) async fn receive_delete_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
delete: Delete,
|
||||||
community: Community,
|
community: Community,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_community = blocking(context.pool(), move |conn| {
|
let deleted_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_deleted(conn, community.id, true)
|
Community::update_deleted(conn, community.id, true)
|
||||||
|
@ -32,8 +33,8 @@ pub(crate) async fn receive_delete_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&delete, context).await?;
|
let user = get_actor_as_user(&delete, context, request_counter).await?;
|
||||||
announce_if_community_is_local(delete, &user, context).await?;
|
announce_if_community_is_local(delete, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,6 +64,7 @@ pub(crate) async fn receive_remove_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// TODO: this should probably also call announce_if_community_is_local()
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,6 +72,7 @@ pub(crate) async fn receive_undo_delete_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
community: Community,
|
community: Community,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_community = blocking(context.pool(), move |conn| {
|
let deleted_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_deleted(conn, community.id, false)
|
Community::update_deleted(conn, community.id, false)
|
||||||
|
@ -92,8 +95,8 @@ pub(crate) async fn receive_undo_delete_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&undo, context).await?;
|
let user = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,6 +104,7 @@ pub(crate) async fn receive_undo_remove_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
community: Community,
|
community: Community,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let removed_community = blocking(context.pool(), move |conn| {
|
let removed_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_removed(conn, community.id, false)
|
Community::update_removed(conn, community.id, false)
|
||||||
|
@ -124,7 +128,7 @@ pub(crate) async fn receive_undo_remove_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mod_ = get_actor_as_user(&undo, context).await?;
|
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &mod_, context).await?;
|
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ async fn announce_if_community_is_local<T, Kind>(
|
||||||
activity: T,
|
activity: T,
|
||||||
user: &User_,
|
user: &User_,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<(), LemmyError>
|
) -> Result<(), LemmyError>
|
||||||
where
|
where
|
||||||
T: AsObject<Kind>,
|
T: AsObject<Kind>,
|
||||||
|
@ -55,7 +56,9 @@ where
|
||||||
.context(location_info!())?
|
.context(location_info!())?
|
||||||
.as_xsd_any_uri()
|
.as_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let community = get_or_fetch_and_upsert_community(&community_uri, context).await?;
|
// TODO: we could just read from the local db here (and ignore if the community is not found)
|
||||||
|
let community =
|
||||||
|
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
|
||||||
|
|
||||||
if community.local {
|
if community.local {
|
||||||
community
|
community
|
||||||
|
@ -69,13 +72,14 @@ where
|
||||||
pub(crate) async fn get_actor_as_user<T, A>(
|
pub(crate) async fn get_actor_as_user<T, A>(
|
||||||
activity: &T,
|
activity: &T,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<User_, LemmyError>
|
) -> Result<User_, LemmyError>
|
||||||
where
|
where
|
||||||
T: AsBase<A> + ActorAndObjectRef,
|
T: AsBase<A> + ActorAndObjectRef,
|
||||||
{
|
{
|
||||||
let actor = activity.actor()?;
|
let actor = activity.actor()?;
|
||||||
let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?;
|
let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?;
|
||||||
get_or_fetch_and_upsert_user(&user_uri, context).await
|
get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum FindResults {
|
pub(crate) enum FindResults {
|
||||||
|
|
|
@ -24,12 +24,13 @@ use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
|
||||||
pub(crate) async fn receive_create_post(
|
pub(crate) async fn receive_create_post(
|
||||||
create: Create,
|
create: Create,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(&create, context).await?;
|
let user = get_actor_as_user(&create, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
|
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
// Using an upsert, since likes (which fetch the post), sometimes come in before the create
|
// Using an upsert, since likes (which fetch the post), sometimes come in before the create
|
||||||
// resulting in double posts.
|
// resulting in double posts.
|
||||||
|
@ -50,21 +51,22 @@ pub(crate) async fn receive_create_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(create, &user, context).await?;
|
announce_if_community_is_local(create, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_update_post(
|
pub(crate) async fn receive_update_post(
|
||||||
update: Update,
|
update: Update,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(&update, context).await?;
|
let user = get_actor_as_user(&update, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
|
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
|
let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -87,21 +89,22 @@ pub(crate) async fn receive_update_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(update, &user, context).await?;
|
announce_if_community_is_local(update, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_like_post(
|
pub(crate) async fn receive_like_post(
|
||||||
like: Like,
|
like: Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(&like, context).await?;
|
let user = get_actor_as_user(&like, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, None).await?;
|
let post = PostForm::from_apub(&page, context, None, request_counter).await?;
|
||||||
|
|
||||||
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
|
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -131,15 +134,16 @@ pub(crate) async fn receive_like_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(like, &user, context).await?;
|
announce_if_community_is_local(like, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_dislike_post(
|
pub(crate) async fn receive_dislike_post(
|
||||||
dislike: Dislike,
|
dislike: Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(&dislike, context).await?;
|
let user = get_actor_as_user(&dislike, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(
|
let page = PageExt::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
.object()
|
.object()
|
||||||
|
@ -149,9 +153,9 @@ pub(crate) async fn receive_dislike_post(
|
||||||
)?
|
)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, None).await?;
|
let post = PostForm::from_apub(&page, context, None, request_counter).await?;
|
||||||
|
|
||||||
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
|
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -181,7 +185,7 @@ pub(crate) async fn receive_dislike_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(dislike, &user, context).await?;
|
announce_if_community_is_local(dislike, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +193,7 @@ pub(crate) async fn receive_delete_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
delete: Delete,
|
||||||
post: Post,
|
post: Post,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_post = blocking(context.pool(), move |conn| {
|
let deleted_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_deleted(conn, post.id, true)
|
Post::update_deleted(conn, post.id, true)
|
||||||
|
@ -209,8 +214,8 @@ pub(crate) async fn receive_delete_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&delete, context).await?;
|
let user = get_actor_as_user(&delete, context, request_counter).await?;
|
||||||
announce_if_community_is_local(delete, &user, context).await?;
|
announce_if_community_is_local(delete, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,14 +20,15 @@ pub(crate) async fn receive_undo_like_post(
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
like: &Like,
|
like: &Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(like, context).await?;
|
let user = get_actor_as_user(like, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, None).await?;
|
let post = PostForm::from_apub(&page, context, None, request_counter).await?;
|
||||||
|
|
||||||
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
|
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -51,7 +52,7 @@ pub(crate) async fn receive_undo_like_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,8 +60,9 @@ pub(crate) async fn receive_undo_dislike_post(
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
dislike: &Dislike,
|
dislike: &Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let user = get_actor_as_user(dislike, context).await?;
|
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(
|
let page = PageExt::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
.object()
|
.object()
|
||||||
|
@ -70,9 +72,9 @@ pub(crate) async fn receive_undo_dislike_post(
|
||||||
)?
|
)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let post = PostForm::from_apub(&page, context, None).await?;
|
let post = PostForm::from_apub(&page, context, None, request_counter).await?;
|
||||||
|
|
||||||
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
|
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
|
||||||
.await?
|
.await?
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
|
@ -96,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,6 +106,7 @@ pub(crate) async fn receive_undo_delete_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
post: Post,
|
post: Post,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let deleted_post = blocking(context.pool(), move |conn| {
|
let deleted_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_deleted(conn, post.id, false)
|
Post::update_deleted(conn, post.id, false)
|
||||||
|
@ -124,8 +127,8 @@ pub(crate) async fn receive_undo_delete_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let user = get_actor_as_user(&undo, context).await?;
|
let user = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &user, context).await?;
|
announce_if_community_is_local(undo, &user, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,6 +136,7 @@ pub(crate) async fn receive_undo_remove_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
post: Post,
|
post: Post,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let removed_post = blocking(context.pool(), move |conn| {
|
let removed_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_removed(conn, post.id, false)
|
Post::update_removed(conn, post.id, false)
|
||||||
|
@ -154,7 +158,7 @@ pub(crate) async fn receive_undo_remove_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mod_ = get_actor_as_user(&undo, context).await?;
|
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
|
||||||
announce_if_community_is_local(undo, &mod_, context).await?;
|
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
|
@ -341,7 +341,7 @@ async fn collect_non_local_mentions_and_addresses(
|
||||||
debug!("mention actor_id: {}", actor_id);
|
debug!("mention actor_id: {}", actor_id);
|
||||||
addressed_ccs.push(actor_id.to_owned().to_string().parse()?);
|
addressed_ccs.push(actor_id.to_owned().to_string().parse()?);
|
||||||
|
|
||||||
let mention_user = get_or_fetch_and_upsert_user(&actor_id, context).await?;
|
let mention_user = get_or_fetch_and_upsert_user(&actor_id, context, &mut 0).await?;
|
||||||
let shared_inbox = mention_user.get_shared_inbox_url()?;
|
let shared_inbox = mention_user.get_shared_inbox_url()?;
|
||||||
|
|
||||||
mention_inboxes.push(shared_inbox);
|
mention_inboxes.push(shared_inbox);
|
||||||
|
|
|
@ -72,7 +72,7 @@ impl ActorType for Community {
|
||||||
.actor()?
|
.actor()?
|
||||||
.as_single_xsd_any_uri()
|
.as_single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_or_fetch_and_upsert_user(actor_uri, context).await?;
|
let user = get_or_fetch_and_upsert_user(actor_uri, context, &mut 0).await?;
|
||||||
|
|
||||||
let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?);
|
let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?);
|
||||||
accept
|
accept
|
||||||
|
|
|
@ -42,12 +42,26 @@ use url::Url;
|
||||||
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
|
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
|
||||||
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
|
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
|
||||||
|
|
||||||
|
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
|
||||||
|
/// fetch through the search).
|
||||||
|
///
|
||||||
|
/// Tests are passing with a value of 5, so 10 should be safe for production.
|
||||||
|
static MAX_REQUEST_NUMBER: i32 = 10;
|
||||||
|
|
||||||
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
|
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
|
||||||
/// timeouts etc.
|
/// timeouts etc.
|
||||||
async fn fetch_remote_object<Response>(client: &Client, url: &Url) -> Result<Response, LemmyError>
|
async fn fetch_remote_object<Response>(
|
||||||
|
client: &Client,
|
||||||
|
url: &Url,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
|
) -> Result<Response, LemmyError>
|
||||||
where
|
where
|
||||||
Response: for<'de> Deserialize<'de>,
|
Response: for<'de> Deserialize<'de>,
|
||||||
{
|
{
|
||||||
|
*recursion_counter += 1;
|
||||||
|
if *recursion_counter > MAX_REQUEST_NUMBER {
|
||||||
|
return Err(anyhow!("Maximum recursion depth reached").into());
|
||||||
|
}
|
||||||
check_is_apub_id_valid(&url)?;
|
check_is_apub_id_valid(&url)?;
|
||||||
|
|
||||||
let timeout = Duration::from_secs(60);
|
let timeout = Duration::from_secs(60);
|
||||||
|
@ -131,12 +145,18 @@ pub async fn search_by_apub_id(
|
||||||
};
|
};
|
||||||
|
|
||||||
let domain = query_url.domain().context("url has no domain")?;
|
let domain = query_url.domain().context("url has no domain")?;
|
||||||
let response =
|
let recursion_counter = &mut 0;
|
||||||
match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
|
let response = match fetch_remote_object::<SearchAcceptedObjects>(
|
||||||
|
context.client(),
|
||||||
|
&query_url,
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
SearchAcceptedObjects::Person(p) => {
|
SearchAcceptedObjects::Person(p) => {
|
||||||
let user_uri = p.inner.id(domain)?.context("person has no id")?;
|
let user_uri = p.inner.id(domain)?.context("person has no id")?;
|
||||||
|
|
||||||
let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
|
let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
|
||||||
|
|
||||||
response.users = vec![
|
response.users = vec![
|
||||||
blocking(context.pool(), move |conn| {
|
blocking(context.pool(), move |conn| {
|
||||||
|
@ -150,7 +170,8 @@ pub async fn search_by_apub_id(
|
||||||
SearchAcceptedObjects::Group(g) => {
|
SearchAcceptedObjects::Group(g) => {
|
||||||
let community_uri = g.inner.id(domain)?.context("group has no id")?;
|
let community_uri = g.inner.id(domain)?.context("group has no id")?;
|
||||||
|
|
||||||
let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
|
let community =
|
||||||
|
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
|
||||||
|
|
||||||
response.communities = vec![
|
response.communities = vec![
|
||||||
blocking(context.pool(), move |conn| {
|
blocking(context.pool(), move |conn| {
|
||||||
|
@ -162,7 +183,7 @@ pub async fn search_by_apub_id(
|
||||||
response
|
response
|
||||||
}
|
}
|
||||||
SearchAcceptedObjects::Page(p) => {
|
SearchAcceptedObjects::Page(p) => {
|
||||||
let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
|
let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
|
||||||
|
|
||||||
let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
|
let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
|
||||||
response.posts =
|
response.posts =
|
||||||
|
@ -171,7 +192,8 @@ pub async fn search_by_apub_id(
|
||||||
response
|
response
|
||||||
}
|
}
|
||||||
SearchAcceptedObjects::Comment(c) => {
|
SearchAcceptedObjects::Comment(c) => {
|
||||||
let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
|
let comment_form =
|
||||||
|
CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
|
||||||
|
|
||||||
let c = blocking(context.pool(), move |conn| {
|
let c = blocking(context.pool(), move |conn| {
|
||||||
Comment::upsert(conn, &comment_form)
|
Comment::upsert(conn, &comment_form)
|
||||||
|
@ -199,11 +221,12 @@ pub async fn search_by_apub_id(
|
||||||
pub(crate) async fn get_or_fetch_and_upsert_actor(
|
pub(crate) async fn get_or_fetch_and_upsert_actor(
|
||||||
apub_id: &Url,
|
apub_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<Box<dyn ActorType>, LemmyError> {
|
) -> Result<Box<dyn ActorType>, LemmyError> {
|
||||||
let community = get_or_fetch_and_upsert_community(apub_id, context).await;
|
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
|
||||||
let actor: Box<dyn ActorType> = match community {
|
let actor: Box<dyn ActorType> = match community {
|
||||||
Ok(c) => Box::new(c),
|
Ok(c) => Box::new(c),
|
||||||
Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context).await?),
|
Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
|
||||||
};
|
};
|
||||||
Ok(actor)
|
Ok(actor)
|
||||||
}
|
}
|
||||||
|
@ -215,6 +238,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor(
|
||||||
pub(crate) async fn get_or_fetch_and_upsert_user(
|
pub(crate) async fn get_or_fetch_and_upsert_user(
|
||||||
apub_id: &Url,
|
apub_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<User_, LemmyError> {
|
) -> Result<User_, LemmyError> {
|
||||||
let apub_id_owned = apub_id.to_owned();
|
let apub_id_owned = apub_id.to_owned();
|
||||||
let user = blocking(context.pool(), move |conn| {
|
let user = blocking(context.pool(), move |conn| {
|
||||||
|
@ -226,9 +250,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
|
||||||
// If its older than a day, re-fetch it
|
// If its older than a day, re-fetch it
|
||||||
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
|
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
|
||||||
debug!("Fetching and updating from remote user: {}", apub_id);
|
debug!("Fetching and updating from remote user: {}", apub_id);
|
||||||
let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
|
let person =
|
||||||
|
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
|
||||||
|
|
||||||
let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
|
let mut uf = UserForm::from_apub(
|
||||||
|
&person,
|
||||||
|
context,
|
||||||
|
Some(apub_id.to_owned()),
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
uf.last_refreshed_at = Some(naive_now());
|
uf.last_refreshed_at = Some(naive_now());
|
||||||
let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
|
let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
|
||||||
|
|
||||||
|
@ -237,9 +268,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
|
||||||
Ok(u) => Ok(u),
|
Ok(u) => Ok(u),
|
||||||
Err(NotFound {}) => {
|
Err(NotFound {}) => {
|
||||||
debug!("Fetching and creating remote user: {}", apub_id);
|
debug!("Fetching and creating remote user: {}", apub_id);
|
||||||
let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
|
let person =
|
||||||
|
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
|
||||||
|
|
||||||
let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
|
let uf = UserForm::from_apub(
|
||||||
|
&person,
|
||||||
|
context,
|
||||||
|
Some(apub_id.to_owned()),
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
|
let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
|
||||||
|
|
||||||
Ok(user)
|
Ok(user)
|
||||||
|
@ -271,6 +309,7 @@ fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
|
||||||
pub(crate) async fn get_or_fetch_and_upsert_community(
|
pub(crate) async fn get_or_fetch_and_upsert_community(
|
||||||
apub_id: &Url,
|
apub_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<Community, LemmyError> {
|
) -> Result<Community, LemmyError> {
|
||||||
let apub_id_owned = apub_id.to_owned();
|
let apub_id_owned = apub_id.to_owned();
|
||||||
let community = blocking(context.pool(), move |conn| {
|
let community = blocking(context.pool(), move |conn| {
|
||||||
|
@ -281,12 +320,12 @@ pub(crate) async fn get_or_fetch_and_upsert_community(
|
||||||
match community {
|
match community {
|
||||||
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
|
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
|
||||||
debug!("Fetching and updating from remote community: {}", apub_id);
|
debug!("Fetching and updating from remote community: {}", apub_id);
|
||||||
fetch_remote_community(apub_id, context, Some(c.id)).await
|
fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await
|
||||||
}
|
}
|
||||||
Ok(c) => Ok(c),
|
Ok(c) => Ok(c),
|
||||||
Err(NotFound {}) => {
|
Err(NotFound {}) => {
|
||||||
debug!("Fetching and creating remote community: {}", apub_id);
|
debug!("Fetching and creating remote community: {}", apub_id);
|
||||||
fetch_remote_community(apub_id, context, None).await
|
fetch_remote_community(apub_id, context, None, recursion_counter).await
|
||||||
}
|
}
|
||||||
Err(e) => Err(e.into()),
|
Err(e) => Err(e.into()),
|
||||||
}
|
}
|
||||||
|
@ -299,10 +338,12 @@ async fn fetch_remote_community(
|
||||||
apub_id: &Url,
|
apub_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
community_id: Option<i32>,
|
community_id: Option<i32>,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<Community, LemmyError> {
|
) -> Result<Community, LemmyError> {
|
||||||
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
|
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await?;
|
||||||
|
|
||||||
let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
|
let cf =
|
||||||
|
CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
|
||||||
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
|
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
|
||||||
|
|
||||||
// Also add the community moderators too
|
// Also add the community moderators too
|
||||||
|
@ -317,7 +358,7 @@ async fn fetch_remote_community(
|
||||||
let mut creator_and_moderators = Vec::new();
|
let mut creator_and_moderators = Vec::new();
|
||||||
|
|
||||||
for uri in creator_and_moderator_uris {
|
for uri in creator_and_moderator_uris {
|
||||||
let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
|
let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
|
||||||
|
|
||||||
creator_and_moderators.push(c_or_m);
|
creator_and_moderators.push(c_or_m);
|
||||||
}
|
}
|
||||||
|
@ -340,8 +381,11 @@ async fn fetch_remote_community(
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch outbox (maybe make this conditional)
|
// fetch outbox (maybe make this conditional)
|
||||||
let outbox =
|
let outbox = fetch_remote_object::<OrderedCollection>(
|
||||||
fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
|
context.client(),
|
||||||
|
&community.get_outbox_url()?,
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let outbox_items = outbox.items().context(location_info!())?.clone();
|
let outbox_items = outbox.items().context(location_info!())?.clone();
|
||||||
let mut outbox_items = outbox_items.many().context(location_info!())?;
|
let mut outbox_items = outbox_items.many().context(location_info!())?;
|
||||||
|
@ -353,7 +397,7 @@ async fn fetch_remote_community(
|
||||||
|
|
||||||
// The post creator may be from a blocked instance,
|
// The post creator may be from a blocked instance,
|
||||||
// if it errors, then continue
|
// if it errors, then continue
|
||||||
let post = match PostForm::from_apub(&page, context, None).await {
|
let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
|
||||||
Ok(post) => post,
|
Ok(post) => post,
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
|
@ -380,6 +424,7 @@ async fn fetch_remote_community(
|
||||||
pub(crate) async fn get_or_fetch_and_insert_post(
|
pub(crate) async fn get_or_fetch_and_insert_post(
|
||||||
post_ap_id: &Url,
|
post_ap_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<Post, LemmyError> {
|
) -> Result<Post, LemmyError> {
|
||||||
let post_ap_id_owned = post_ap_id.to_owned();
|
let post_ap_id_owned = post_ap_id.to_owned();
|
||||||
let post = blocking(context.pool(), move |conn| {
|
let post = blocking(context.pool(), move |conn| {
|
||||||
|
@ -391,8 +436,15 @@ pub(crate) async fn get_or_fetch_and_insert_post(
|
||||||
Ok(p) => Ok(p),
|
Ok(p) => Ok(p),
|
||||||
Err(NotFound {}) => {
|
Err(NotFound {}) => {
|
||||||
debug!("Fetching and creating remote post: {}", post_ap_id);
|
debug!("Fetching and creating remote post: {}", post_ap_id);
|
||||||
let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
|
let post =
|
||||||
let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
|
fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
|
||||||
|
let post_form = PostForm::from_apub(
|
||||||
|
&post,
|
||||||
|
context,
|
||||||
|
Some(post_ap_id.to_owned()),
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
|
let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
|
||||||
|
|
||||||
|
@ -409,6 +461,7 @@ pub(crate) async fn get_or_fetch_and_insert_post(
|
||||||
pub(crate) async fn get_or_fetch_and_insert_comment(
|
pub(crate) async fn get_or_fetch_and_insert_comment(
|
||||||
comment_ap_id: &Url,
|
comment_ap_id: &Url,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
) -> Result<Comment, LemmyError> {
|
) -> Result<Comment, LemmyError> {
|
||||||
let comment_ap_id_owned = comment_ap_id.to_owned();
|
let comment_ap_id_owned = comment_ap_id.to_owned();
|
||||||
let comment = blocking(context.pool(), move |conn| {
|
let comment = blocking(context.pool(), move |conn| {
|
||||||
|
@ -423,9 +476,15 @@ pub(crate) async fn get_or_fetch_and_insert_comment(
|
||||||
"Fetching and creating remote comment and its parents: {}",
|
"Fetching and creating remote comment and its parents: {}",
|
||||||
comment_ap_id
|
comment_ap_id
|
||||||
);
|
);
|
||||||
let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
|
let comment =
|
||||||
let comment_form =
|
fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
|
||||||
CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
|
let comment_form = CommentForm::from_apub(
|
||||||
|
&comment,
|
||||||
|
context,
|
||||||
|
Some(comment_ap_id.to_owned()),
|
||||||
|
recursion_counter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let comment = blocking(context.pool(), move |conn| {
|
let comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::upsert(conn, &comment_form)
|
Comment::upsert(conn, &comment_form)
|
||||||
|
|
|
@ -75,7 +75,8 @@ pub async fn community_inbox(
|
||||||
);
|
);
|
||||||
check_is_apub_id_valid(user_uri)?;
|
check_is_apub_id_valid(user_uri)?;
|
||||||
|
|
||||||
let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?;
|
let request_counter = &mut 0;
|
||||||
|
let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
|
||||||
|
|
||||||
verify_signature(&request, &user)?;
|
verify_signature(&request, &user)?;
|
||||||
|
|
||||||
|
|
|
@ -100,20 +100,23 @@ pub async fn shared_inbox(
|
||||||
|
|
||||||
check_is_apub_id_valid(&actor_id)?;
|
check_is_apub_id_valid(&actor_id)?;
|
||||||
|
|
||||||
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context).await?;
|
let request_counter = &mut 0;
|
||||||
|
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
|
||||||
verify_signature(&request, actor.as_ref())?;
|
verify_signature(&request, actor.as_ref())?;
|
||||||
|
|
||||||
let any_base = activity.clone().into_any_base()?;
|
let any_base = activity.clone().into_any_base()?;
|
||||||
let kind = activity.kind().context(location_info!())?;
|
let kind = activity.kind().context(location_info!())?;
|
||||||
let res = match kind {
|
let res = match kind {
|
||||||
ValidTypes::Announce => receive_announce(&context, any_base, actor.as_ref()).await,
|
ValidTypes::Announce => {
|
||||||
ValidTypes::Create => receive_create(&context, any_base, actor_id).await,
|
receive_announce(&context, any_base, actor.as_ref(), request_counter).await
|
||||||
ValidTypes::Update => receive_update(&context, any_base, actor_id).await,
|
}
|
||||||
ValidTypes::Like => receive_like(&context, any_base, actor_id).await,
|
ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
|
||||||
ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id).await,
|
ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
|
||||||
|
ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
|
||||||
|
ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
|
||||||
ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
|
ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
|
||||||
ValidTypes::Delete => receive_delete(&context, any_base, actor_id).await,
|
ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
|
||||||
ValidTypes::Undo => receive_undo(&context, any_base, actor_id).await,
|
ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
|
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
|
||||||
|
@ -125,6 +128,7 @@ async fn receive_announce(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
|
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
|
verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
|
||||||
|
@ -140,13 +144,13 @@ async fn receive_announce(
|
||||||
check_is_apub_id_valid(&inner_id)?;
|
check_is_apub_id_valid(&inner_id)?;
|
||||||
|
|
||||||
match kind {
|
match kind {
|
||||||
Some("Create") => receive_create(context, object, inner_id).await,
|
Some("Create") => receive_create(context, object, inner_id, request_counter).await,
|
||||||
Some("Update") => receive_update(context, object, inner_id).await,
|
Some("Update") => receive_update(context, object, inner_id, request_counter).await,
|
||||||
Some("Like") => receive_like(context, object, inner_id).await,
|
Some("Like") => receive_like(context, object, inner_id, request_counter).await,
|
||||||
Some("Dislike") => receive_dislike(context, object, inner_id).await,
|
Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
|
||||||
Some("Delete") => receive_delete(context, object, inner_id).await,
|
Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
|
||||||
Some("Remove") => receive_remove(context, object, inner_id).await,
|
Some("Remove") => receive_remove(context, object, inner_id).await,
|
||||||
Some("Undo") => receive_undo(context, object, inner_id).await,
|
Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
|
||||||
_ => receive_unhandled_activity(announce),
|
_ => receive_unhandled_activity(announce),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -155,13 +159,14 @@ async fn receive_create(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&create, expected_domain, true)?;
|
verify_activity_domains_valid(&create, expected_domain, true)?;
|
||||||
|
|
||||||
match create.object().as_single_kind_str() {
|
match create.object().as_single_kind_str() {
|
||||||
Some("Page") => receive_create_post(create, context).await,
|
Some("Page") => receive_create_post(create, context, request_counter).await,
|
||||||
Some("Note") => receive_create_comment(create, context).await,
|
Some("Note") => receive_create_comment(create, context, request_counter).await,
|
||||||
_ => receive_unhandled_activity(create),
|
_ => receive_unhandled_activity(create),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -170,13 +175,14 @@ async fn receive_update(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&update, expected_domain, true)?;
|
verify_activity_domains_valid(&update, expected_domain, true)?;
|
||||||
|
|
||||||
match update.object().as_single_kind_str() {
|
match update.object().as_single_kind_str() {
|
||||||
Some("Page") => receive_update_post(update, context).await,
|
Some("Page") => receive_update_post(update, context, request_counter).await,
|
||||||
Some("Note") => receive_update_comment(update, context).await,
|
Some("Note") => receive_update_comment(update, context, request_counter).await,
|
||||||
_ => receive_unhandled_activity(update),
|
_ => receive_unhandled_activity(update),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -185,13 +191,14 @@ async fn receive_like(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let like = Like::from_any_base(activity)?.context(location_info!())?;
|
let like = Like::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&like, expected_domain, false)?;
|
verify_activity_domains_valid(&like, expected_domain, false)?;
|
||||||
|
|
||||||
match like.object().as_single_kind_str() {
|
match like.object().as_single_kind_str() {
|
||||||
Some("Page") => receive_like_post(like, context).await,
|
Some("Page") => receive_like_post(like, context, request_counter).await,
|
||||||
Some("Note") => receive_like_comment(like, context).await,
|
Some("Note") => receive_like_comment(like, context, request_counter).await,
|
||||||
_ => receive_unhandled_activity(like),
|
_ => receive_unhandled_activity(like),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -200,6 +207,7 @@ async fn receive_dislike(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let enable_downvotes = blocking(context.pool(), move |conn| {
|
let enable_downvotes = blocking(context.pool(), move |conn| {
|
||||||
Site::read(conn, 1).map(|s| s.enable_downvotes)
|
Site::read(conn, 1).map(|s| s.enable_downvotes)
|
||||||
|
@ -213,8 +221,8 @@ async fn receive_dislike(
|
||||||
verify_activity_domains_valid(&dislike, expected_domain, false)?;
|
verify_activity_domains_valid(&dislike, expected_domain, false)?;
|
||||||
|
|
||||||
match dislike.object().as_single_kind_str() {
|
match dislike.object().as_single_kind_str() {
|
||||||
Some("Page") => receive_dislike_post(dislike, context).await,
|
Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
|
||||||
Some("Note") => receive_dislike_comment(dislike, context).await,
|
Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
|
||||||
_ => receive_unhandled_activity(dislike),
|
_ => receive_unhandled_activity(dislike),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,6 +231,7 @@ pub async fn receive_delete(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
|
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
||||||
|
@ -234,9 +243,13 @@ pub async fn receive_delete(
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
match find_by_id(context, object).await {
|
match find_by_id(context, object).await {
|
||||||
Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p).await,
|
Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
|
||||||
Ok(FindResults::Comment(c)) => receive_delete_comment(context, delete, c).await,
|
Ok(FindResults::Comment(c)) => {
|
||||||
Ok(FindResults::Community(c)) => receive_delete_community(context, delete, c).await,
|
receive_delete_comment(context, delete, c, request_counter).await
|
||||||
|
}
|
||||||
|
Ok(FindResults::Community(c)) => {
|
||||||
|
receive_delete_community(context, delete, c, request_counter).await
|
||||||
|
}
|
||||||
// if we dont have the object, no need to do anything
|
// if we dont have the object, no need to do anything
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
Err(_) => Ok(HttpResponse::Ok().finish()),
|
||||||
}
|
}
|
||||||
|
@ -283,15 +296,16 @@ async fn receive_undo(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
|
verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
|
||||||
|
|
||||||
match undo.object().as_single_kind_str() {
|
match undo.object().as_single_kind_str() {
|
||||||
Some("Delete") => receive_undo_delete(context, undo, expected_domain).await,
|
Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
|
||||||
Some("Remove") => receive_undo_remove(context, undo, expected_domain).await,
|
Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
|
||||||
Some("Like") => receive_undo_like(context, undo, expected_domain).await,
|
Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
|
||||||
Some("Dislike") => receive_undo_dislike(context, undo, expected_domain).await,
|
Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
|
||||||
_ => receive_unhandled_activity(undo),
|
_ => receive_unhandled_activity(undo),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -300,6 +314,7 @@ async fn receive_undo_delete(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -311,9 +326,13 @@ async fn receive_undo_delete(
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
match find_by_id(context, object).await {
|
match find_by_id(context, object).await {
|
||||||
Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p).await,
|
Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
|
||||||
Ok(FindResults::Comment(c)) => receive_undo_delete_comment(context, undo, c).await,
|
Ok(FindResults::Comment(c)) => {
|
||||||
Ok(FindResults::Community(c)) => receive_undo_delete_community(context, undo, c).await,
|
receive_undo_delete_comment(context, undo, c, request_counter).await
|
||||||
|
}
|
||||||
|
Ok(FindResults::Community(c)) => {
|
||||||
|
receive_undo_delete_community(context, undo, c, request_counter).await
|
||||||
|
}
|
||||||
// if we dont have the object, no need to do anything
|
// if we dont have the object, no need to do anything
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
Err(_) => Ok(HttpResponse::Ok().finish()),
|
||||||
}
|
}
|
||||||
|
@ -323,6 +342,7 @@ async fn receive_undo_remove(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -334,9 +354,13 @@ async fn receive_undo_remove(
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
match find_by_id(context, object).await {
|
match find_by_id(context, object).await {
|
||||||
Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p).await,
|
Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
|
||||||
Ok(FindResults::Comment(c)) => receive_undo_remove_comment(context, undo, c).await,
|
Ok(FindResults::Comment(c)) => {
|
||||||
Ok(FindResults::Community(c)) => receive_undo_remove_community(context, undo, c).await,
|
receive_undo_remove_comment(context, undo, c, request_counter).await
|
||||||
|
}
|
||||||
|
Ok(FindResults::Community(c)) => {
|
||||||
|
receive_undo_remove_community(context, undo, c, request_counter).await
|
||||||
|
}
|
||||||
// if we dont have the object, no need to do anything
|
// if we dont have the object, no need to do anything
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
Err(_) => Ok(HttpResponse::Ok().finish()),
|
||||||
}
|
}
|
||||||
|
@ -346,6 +370,7 @@ async fn receive_undo_like(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -356,8 +381,8 @@ async fn receive_undo_like(
|
||||||
.as_single_kind_str()
|
.as_single_kind_str()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
match type_ {
|
match type_ {
|
||||||
"Note" => receive_undo_like_comment(undo, &like, context).await,
|
"Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
|
||||||
"Page" => receive_undo_like_post(undo, &like, context).await,
|
"Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
|
||||||
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -366,6 +391,7 @@ async fn receive_undo_dislike(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
expected_domain: Url,
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -376,8 +402,8 @@ async fn receive_undo_dislike(
|
||||||
.as_single_kind_str()
|
.as_single_kind_str()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
match type_ {
|
match type_ {
|
||||||
"Note" => receive_undo_dislike_comment(undo, &dislike, context).await,
|
"Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
|
||||||
"Page" => receive_undo_dislike_post(undo, &dislike, context).await,
|
"Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
|
||||||
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,15 +79,22 @@ pub async fn user_inbox(
|
||||||
|
|
||||||
check_is_apub_id_valid(actor_uri)?;
|
check_is_apub_id_valid(actor_uri)?;
|
||||||
|
|
||||||
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context).await?;
|
let request_counter = &mut 0;
|
||||||
|
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
|
||||||
verify_signature(&request, actor.as_ref())?;
|
verify_signature(&request, actor.as_ref())?;
|
||||||
|
|
||||||
let any_base = activity.clone().into_any_base()?;
|
let any_base = activity.clone().into_any_base()?;
|
||||||
let kind = activity.kind().context(location_info!())?;
|
let kind = activity.kind().context(location_info!())?;
|
||||||
let res = match kind {
|
let res = match kind {
|
||||||
ValidTypes::Accept => receive_accept(&context, any_base, actor.as_ref(), user).await,
|
ValidTypes::Accept => {
|
||||||
ValidTypes::Create => receive_create_private_message(&context, any_base, actor.as_ref()).await,
|
receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
|
||||||
ValidTypes::Update => receive_update_private_message(&context, any_base, actor.as_ref()).await,
|
}
|
||||||
|
ValidTypes::Create => {
|
||||||
|
receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
|
||||||
|
}
|
||||||
|
ValidTypes::Update => {
|
||||||
|
receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
|
||||||
|
}
|
||||||
ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
|
ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
|
||||||
ValidTypes::Undo => {
|
ValidTypes::Undo => {
|
||||||
receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
|
receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
|
||||||
|
@ -104,6 +111,7 @@ async fn receive_accept(
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
user: User_,
|
user: User_,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let accept = Accept::from_any_base(activity)?.context(location_info!())?;
|
let accept = Accept::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
|
verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
|
||||||
|
@ -120,7 +128,8 @@ async fn receive_accept(
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let community = get_or_fetch_and_upsert_community(&community_uri, context).await?;
|
let community =
|
||||||
|
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
|
||||||
|
|
||||||
// Now you need to add this to the community follower
|
// Now you need to add this to the community follower
|
||||||
let community_follower_form = CommunityFollowerForm {
|
let community_follower_form = CommunityFollowerForm {
|
||||||
|
@ -141,6 +150,7 @@ async fn receive_create_private_message(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
|
verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
|
||||||
|
@ -155,7 +165,7 @@ async fn receive_create_private_message(
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let private_message =
|
let private_message =
|
||||||
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?)).await?;
|
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
let inserted_private_message = blocking(&context.pool(), move |conn| {
|
let inserted_private_message = blocking(&context.pool(), move |conn| {
|
||||||
PrivateMessage::create(conn, &private_message)
|
PrivateMessage::create(conn, &private_message)
|
||||||
|
@ -185,6 +195,7 @@ async fn receive_update_private_message(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
|
verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
|
||||||
|
@ -197,7 +208,7 @@ async fn receive_update_private_message(
|
||||||
let note = Note::from_any_base(object)?.context(location_info!())?;
|
let note = Note::from_any_base(object)?.context(location_info!())?;
|
||||||
|
|
||||||
let private_message_form =
|
let private_message_form =
|
||||||
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?)).await?;
|
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
|
||||||
|
|
||||||
let private_message_ap_id = private_message_form
|
let private_message_ap_id = private_message_form
|
||||||
.ap_id
|
.ap_id
|
||||||
|
|
|
@ -122,6 +122,7 @@ pub trait FromApub {
|
||||||
apub: &Self::ApubType,
|
apub: &Self::ApubType,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<Self, LemmyError>
|
) -> Result<Self, LemmyError>
|
||||||
where
|
where
|
||||||
Self: Sized;
|
Self: Sized;
|
||||||
|
|
|
@ -89,6 +89,7 @@ impl FromApub for CommentForm {
|
||||||
note: &Note,
|
note: &Note,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<CommentForm, LemmyError> {
|
) -> Result<CommentForm, LemmyError> {
|
||||||
let creator_actor_id = ¬e
|
let creator_actor_id = ¬e
|
||||||
.attributed_to()
|
.attributed_to()
|
||||||
|
@ -96,7 +97,7 @@ impl FromApub for CommentForm {
|
||||||
.as_single_xsd_any_uri()
|
.as_single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?;
|
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
|
||||||
|
|
||||||
let mut in_reply_tos = note
|
let mut in_reply_tos = note
|
||||||
.in_reply_to()
|
.in_reply_to()
|
||||||
|
@ -109,7 +110,7 @@ impl FromApub for CommentForm {
|
||||||
let post_ap_id = in_reply_tos.next().context(location_info!())??;
|
let post_ap_id = in_reply_tos.next().context(location_info!())??;
|
||||||
|
|
||||||
// This post, or the parent comment might not yet exist on this server yet, fetch them.
|
// This post, or the parent comment might not yet exist on this server yet, fetch them.
|
||||||
let post = get_or_fetch_and_insert_post(&post_ap_id, context).await?;
|
let post = get_or_fetch_and_insert_post(&post_ap_id, context, request_counter).await?;
|
||||||
|
|
||||||
// The 2nd item, if it exists, is the parent comment apub_id
|
// The 2nd item, if it exists, is the parent comment apub_id
|
||||||
// For deeply nested comments, FromApub automatically gets called recursively
|
// For deeply nested comments, FromApub automatically gets called recursively
|
||||||
|
@ -117,7 +118,7 @@ impl FromApub for CommentForm {
|
||||||
Some(parent_comment_uri) => {
|
Some(parent_comment_uri) => {
|
||||||
let parent_comment_ap_id = &parent_comment_uri?;
|
let parent_comment_ap_id = &parent_comment_uri?;
|
||||||
let parent_comment =
|
let parent_comment =
|
||||||
get_or_fetch_and_insert_comment(&parent_comment_ap_id, context).await?;
|
get_or_fetch_and_insert_comment(&parent_comment_ap_id, context, request_counter).await?;
|
||||||
|
|
||||||
Some(parent_comment.id)
|
Some(parent_comment.id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,7 @@ impl FromApub for CommunityForm {
|
||||||
group: &GroupExt,
|
group: &GroupExt,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<Self, LemmyError> {
|
) -> Result<Self, LemmyError> {
|
||||||
let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?;
|
let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?;
|
||||||
let creator_uri = creator_and_moderator_uris
|
let creator_uri = creator_and_moderator_uris
|
||||||
|
@ -122,7 +123,7 @@ impl FromApub for CommunityForm {
|
||||||
.as_xsd_any_uri()
|
.as_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let creator = get_or_fetch_and_upsert_user(creator_uri, context).await?;
|
let creator = get_or_fetch_and_upsert_user(creator_uri, context, request_counter).await?;
|
||||||
let name = group
|
let name = group
|
||||||
.inner
|
.inner
|
||||||
.preferred_username()
|
.preferred_username()
|
||||||
|
|
|
@ -101,6 +101,7 @@ impl FromApub for PostForm {
|
||||||
page: &PageExt,
|
page: &PageExt,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<PostForm, LemmyError> {
|
) -> Result<PostForm, LemmyError> {
|
||||||
let ext = &page.ext_one;
|
let ext = &page.ext_one;
|
||||||
let creator_actor_id = page
|
let creator_actor_id = page
|
||||||
|
@ -111,7 +112,7 @@ impl FromApub for PostForm {
|
||||||
.as_single_xsd_any_uri()
|
.as_single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?;
|
let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
|
||||||
|
|
||||||
let community_actor_id = page
|
let community_actor_id = page
|
||||||
.inner
|
.inner
|
||||||
|
@ -121,7 +122,8 @@ impl FromApub for PostForm {
|
||||||
.as_single_xsd_any_uri()
|
.as_single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let community = get_or_fetch_and_upsert_community(community_actor_id, context).await?;
|
let community =
|
||||||
|
get_or_fetch_and_upsert_community(community_actor_id, context, request_counter).await?;
|
||||||
|
|
||||||
let thumbnail_url = match &page.inner.image() {
|
let thumbnail_url = match &page.inner.image() {
|
||||||
Some(any_image) => Image::from_any_base(
|
Some(any_image) => Image::from_any_base(
|
||||||
|
|
|
@ -62,6 +62,7 @@ impl FromApub for PrivateMessageForm {
|
||||||
note: &Note,
|
note: &Note,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
request_counter: &mut i32,
|
||||||
) -> Result<PrivateMessageForm, LemmyError> {
|
) -> Result<PrivateMessageForm, LemmyError> {
|
||||||
let creator_actor_id = note
|
let creator_actor_id = note
|
||||||
.attributed_to()
|
.attributed_to()
|
||||||
|
@ -70,14 +71,15 @@ impl FromApub for PrivateMessageForm {
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context).await?;
|
let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context, request_counter).await?;
|
||||||
let recipient_actor_id = note
|
let recipient_actor_id = note
|
||||||
.to()
|
.to()
|
||||||
.context(location_info!())?
|
.context(location_info!())?
|
||||||
.clone()
|
.clone()
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let recipient = get_or_fetch_and_upsert_user(&recipient_actor_id, context).await?;
|
let recipient =
|
||||||
|
get_or_fetch_and_upsert_user(&recipient_actor_id, context, request_counter).await?;
|
||||||
let ap_id = note.id_unchecked().context(location_info!())?.to_string();
|
let ap_id = note.id_unchecked().context(location_info!())?.to_string();
|
||||||
check_is_apub_id_valid(&Url::parse(&ap_id)?)?;
|
check_is_apub_id_valid(&Url::parse(&ap_id)?)?;
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ impl FromApub for UserForm {
|
||||||
person: &PersonExt,
|
person: &PersonExt,
|
||||||
_context: &LemmyContext,
|
_context: &LemmyContext,
|
||||||
expected_domain: Option<Url>,
|
expected_domain: Option<Url>,
|
||||||
|
_request_counter: &mut i32,
|
||||||
) -> Result<Self, LemmyError> {
|
) -> Result<Self, LemmyError> {
|
||||||
let avatar = match person.icon() {
|
let avatar = match person.icon() {
|
||||||
Some(any_image) => Some(
|
Some(any_image) => Some(
|
||||||
|
|
Loading…
Reference in a new issue