Separate logic for user and community inbox
more refactoring with tons of changes: - inbox functions return LemmyError instead of HttpResponse - announce is done directly in community inbox - reorganized functions for handling inbox activities - additional checks for private messages - moved inbox handler functions for post, comment, vote into separete file - ensure that posts, comments etc are addressed to public (ref #1220) - probably more
This commit is contained in:
parent
7e13970a4f
commit
b469b6d8d3
12 changed files with 1245 additions and 826 deletions
|
@ -1,15 +1,14 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::{announce_if_community_is_local, get_actor_as_user},
|
activities::receive::get_actor_as_user,
|
||||||
fetcher::get_or_fetch_and_insert_comment,
|
fetcher::get_or_fetch_and_insert_comment,
|
||||||
ActorType,
|
ActorType,
|
||||||
FromApub,
|
FromApub,
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
activity::{ActorAndObjectRefExt, Create, Delete, Dislike, Like, Remove, Update},
|
activity::{ActorAndObjectRefExt, Create, Dislike, Like, Remove, Update},
|
||||||
base::ExtendsExt,
|
base::ExtendsExt,
|
||||||
object::Note,
|
object::Note,
|
||||||
};
|
};
|
||||||
use actix_web::HttpResponse;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use lemmy_db::{
|
use lemmy_db::{
|
||||||
comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
|
comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
|
||||||
|
@ -26,7 +25,7 @@ pub(crate) async fn receive_create_comment(
|
||||||
create: Create,
|
create: Create,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(&create, context, request_counter).await?;
|
let user = get_actor_as_user(&create, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -73,15 +72,14 @@ pub(crate) async fn receive_create_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(create, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_update_comment(
|
pub(crate) async fn receive_update_comment(
|
||||||
update: Update,
|
update: Update,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_actor_as_user(&update, context, request_counter).await?;
|
let user = get_actor_as_user(&update, context, request_counter).await?;
|
||||||
|
@ -131,15 +129,14 @@ pub(crate) async fn receive_update_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(update, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_like_comment(
|
pub(crate) async fn receive_like_comment(
|
||||||
like: Like,
|
like: Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let user = get_actor_as_user(&like, context, request_counter).await?;
|
let user = get_actor_as_user(&like, context, request_counter).await?;
|
||||||
|
@ -183,15 +180,14 @@ pub(crate) async fn receive_like_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(like, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_dislike_comment(
|
pub(crate) async fn receive_dislike_comment(
|
||||||
dislike: Dislike,
|
dislike: Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let note = Note::from_any_base(
|
let note = Note::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
.object()
|
.object()
|
||||||
|
@ -241,16 +237,13 @@ pub(crate) async fn receive_dislike_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(dislike, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_delete_comment(
|
pub(crate) async fn receive_delete_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let deleted_comment = blocking(context.pool(), move |conn| {
|
let deleted_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_deleted(conn, comment.id, true)
|
Comment::update_deleted(conn, comment.id, true)
|
||||||
})
|
})
|
||||||
|
@ -276,15 +269,14 @@ pub(crate) async fn receive_delete_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(delete, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_remove_comment(
|
pub(crate) async fn receive_remove_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
_remove: Remove,
|
_remove: Remove,
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let removed_comment = blocking(context.pool(), move |conn| {
|
let removed_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_removed(conn, comment.id, true)
|
Comment::update_removed(conn, comment.id, true)
|
||||||
})
|
})
|
||||||
|
@ -310,5 +302,5 @@ pub(crate) async fn receive_remove_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::{announce_if_community_is_local, get_actor_as_user},
|
activities::receive::get_actor_as_user,
|
||||||
fetcher::get_or_fetch_and_insert_comment,
|
fetcher::get_or_fetch_and_insert_comment,
|
||||||
FromApub,
|
FromApub,
|
||||||
};
|
};
|
||||||
use activitystreams::{activity::*, object::Note, prelude::*};
|
use activitystreams::{activity::*, object::Note, prelude::*};
|
||||||
use actix_web::HttpResponse;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use lemmy_db::{
|
use lemmy_db::{
|
||||||
comment::{Comment, CommentForm, CommentLike},
|
comment::{Comment, CommentForm, CommentLike},
|
||||||
|
@ -16,11 +15,10 @@ use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
|
use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_like_comment(
|
pub(crate) async fn receive_undo_like_comment(
|
||||||
undo: Undo,
|
|
||||||
like: &Like,
|
like: &Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(like, context, request_counter).await?;
|
let user = get_actor_as_user(like, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -57,16 +55,14 @@ pub(crate) async fn receive_undo_like_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_dislike_comment(
|
pub(crate) async fn receive_undo_dislike_comment(
|
||||||
undo: Undo,
|
|
||||||
dislike: &Dislike,
|
dislike: &Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
||||||
let note = Note::from_any_base(
|
let note = Note::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
|
@ -109,16 +105,13 @@ pub(crate) async fn receive_undo_dislike_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_delete_comment(
|
pub(crate) async fn receive_undo_delete_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let deleted_comment = blocking(context.pool(), move |conn| {
|
let deleted_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_deleted(conn, comment.id, false)
|
Comment::update_deleted(conn, comment.id, false)
|
||||||
})
|
})
|
||||||
|
@ -145,16 +138,13 @@ pub(crate) async fn receive_undo_delete_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_remove_comment(
|
pub(crate) async fn receive_undo_remove_comment(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
|
||||||
comment: Comment,
|
comment: Comment,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let removed_comment = blocking(context.pool(), move |conn| {
|
let removed_comment = blocking(context.pool(), move |conn| {
|
||||||
Comment::update_removed(conn, comment.id, false)
|
Comment::update_removed(conn, comment.id, false)
|
||||||
})
|
})
|
||||||
|
@ -181,6 +171,5 @@ pub(crate) async fn receive_undo_remove_comment(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,17 +1,19 @@
|
||||||
use crate::activities::receive::announce_if_community_is_local;
|
use crate::{activities::receive::verify_activity_domains_valid, inbox::is_addressed_to_public};
|
||||||
use activitystreams::activity::{Delete, Remove, Undo};
|
use activitystreams::{
|
||||||
use actix_web::HttpResponse;
|
activity::{ActorAndObjectRefExt, Delete, Remove, Undo},
|
||||||
|
base::{AnyBase, ExtendsExt},
|
||||||
|
};
|
||||||
|
use anyhow::Context;
|
||||||
use lemmy_db::{community::Community, community_view::CommunityView};
|
use lemmy_db::{community::Community, community_view::CommunityView};
|
||||||
use lemmy_structs::{blocking, community::CommunityResponse};
|
use lemmy_structs::{blocking, community::CommunityResponse};
|
||||||
use lemmy_utils::LemmyError;
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
|
use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
pub(crate) async fn receive_delete_community(
|
pub(crate) async fn receive_delete_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
|
||||||
community: Community,
|
community: Community,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let deleted_community = blocking(context.pool(), move |conn| {
|
let deleted_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_deleted(conn, community.id, true)
|
Community::update_deleted(conn, community.id, true)
|
||||||
})
|
})
|
||||||
|
@ -33,15 +35,28 @@ pub(crate) async fn receive_delete_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(delete, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_remove_community(
|
pub(crate) async fn receive_remove_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
_remove: Remove,
|
activity: AnyBase,
|
||||||
community: Community,
|
expected_domain: &Url,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
|
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&remove, expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&remove)?;
|
||||||
|
|
||||||
|
let community_uri = remove
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
let community = blocking(context.pool(), move |conn| {
|
||||||
|
Community::read_from_actor_id(conn, community_uri.as_str())
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
let removed_community = blocking(context.pool(), move |conn| {
|
let removed_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_removed(conn, community.id, true)
|
Community::update_removed(conn, community.id, true)
|
||||||
})
|
})
|
||||||
|
@ -63,16 +78,21 @@ pub(crate) async fn receive_remove_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: this should probably also call announce_if_community_is_local()
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_delete_community(
|
pub(crate) async fn receive_undo_delete_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
community: Community,
|
community: Community,
|
||||||
request_counter: &mut i32,
|
expected_domain: &Url,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
|
is_addressed_to_public(&undo)?;
|
||||||
|
let inner = undo.object().to_owned().one().context(location_info!())?;
|
||||||
|
let delete = Delete::from_any_base(inner)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&delete)?;
|
||||||
|
|
||||||
let deleted_community = blocking(context.pool(), move |conn| {
|
let deleted_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_deleted(conn, community.id, false)
|
Community::update_deleted(conn, community.id, false)
|
||||||
})
|
})
|
||||||
|
@ -94,16 +114,31 @@ pub(crate) async fn receive_undo_delete_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_remove_community(
|
pub(crate) async fn receive_undo_remove_community(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
undo: Undo,
|
||||||
community: Community,
|
expected_domain: &Url,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
is_addressed_to_public(&undo)?;
|
||||||
|
|
||||||
|
let inner = undo.object().to_owned().one().context(location_info!())?;
|
||||||
|
let remove = Remove::from_any_base(inner)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&remove, &expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&remove)?;
|
||||||
|
|
||||||
|
let community_uri = remove
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
let community = blocking(context.pool(), move |conn| {
|
||||||
|
Community::read_from_actor_id(conn, community_uri.as_str())
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
let removed_community = blocking(context.pool(), move |conn| {
|
let removed_community = blocking(context.pool(), move |conn| {
|
||||||
Community::update_removed(conn, community.id, false)
|
Community::update_removed(conn, community.id, false)
|
||||||
})
|
})
|
||||||
|
@ -126,6 +161,5 @@ pub(crate) async fn receive_undo_remove_community(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,22 +1,14 @@
|
||||||
use crate::{
|
use crate::fetcher::get_or_fetch_and_upsert_user;
|
||||||
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
|
|
||||||
ActorType,
|
|
||||||
};
|
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
activity::{ActorAndObjectRef, ActorAndObjectRefExt},
|
activity::{ActorAndObjectRef, ActorAndObjectRefExt},
|
||||||
base::{AsBase, BaseExt, Extends, ExtendsExt},
|
base::{AsBase, BaseExt},
|
||||||
error::DomainError,
|
error::DomainError,
|
||||||
object::{AsObject, ObjectExt},
|
|
||||||
};
|
};
|
||||||
use actix_web::HttpResponse;
|
use anyhow::{anyhow, Context};
|
||||||
use anyhow::Context;
|
use lemmy_db::user::User_;
|
||||||
use diesel::result::Error::NotFound;
|
|
||||||
use lemmy_db::{comment::Comment, community::Community, post::Post, user::User_};
|
|
||||||
use lemmy_structs::blocking;
|
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::Serialize;
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
|
@ -25,46 +17,15 @@ pub(crate) mod comment_undo;
|
||||||
pub(crate) mod community;
|
pub(crate) mod community;
|
||||||
pub(crate) mod post;
|
pub(crate) mod post;
|
||||||
pub(crate) mod post_undo;
|
pub(crate) mod post_undo;
|
||||||
|
pub(crate) mod private_message;
|
||||||
|
|
||||||
/// Return HTTP 501 for unsupported activities in inbox.
|
/// Return HTTP 501 for unsupported activities in inbox.
|
||||||
pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
|
pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<(), LemmyError>
|
||||||
where
|
where
|
||||||
A: Debug,
|
A: Debug,
|
||||||
{
|
{
|
||||||
debug!("received unhandled activity type: {:?}", activity);
|
debug!("received unhandled activity type: {:?}", activity);
|
||||||
Ok(HttpResponse::NotImplemented().finish())
|
Err(anyhow!("Activity not supported").into())
|
||||||
}
|
|
||||||
|
|
||||||
/// Reads the destination community from the activity's `cc` field. If this refers to a local
|
|
||||||
/// community, the activity is announced to all community followers.
|
|
||||||
async fn announce_if_community_is_local<T, Kind>(
|
|
||||||
activity: T,
|
|
||||||
context: &LemmyContext,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<(), LemmyError>
|
|
||||||
where
|
|
||||||
T: AsObject<Kind>,
|
|
||||||
T: Extends<Kind>,
|
|
||||||
Kind: Serialize,
|
|
||||||
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
let cc = activity.cc().context(location_info!())?;
|
|
||||||
let cc = cc.as_many().context(location_info!())?;
|
|
||||||
let community_uri = cc
|
|
||||||
.first()
|
|
||||||
.context(location_info!())?
|
|
||||||
.as_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
// TODO: we could just read from the local db here (and ignore if the community is not found)
|
|
||||||
let community =
|
|
||||||
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
|
|
||||||
|
|
||||||
if community.local {
|
|
||||||
community
|
|
||||||
.send_announce(activity.into_any_base()?, context)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads the actor field of an activity and returns the corresponding `User_`.
|
/// Reads the actor field of an activity and returns the corresponding `User_`.
|
||||||
|
@ -81,49 +42,6 @@ where
|
||||||
get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
|
get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum FindResults {
|
|
||||||
Comment(Comment),
|
|
||||||
Community(Community),
|
|
||||||
Post(Post),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tries to find a community, post or comment in the local database, without any network requests.
|
|
||||||
/// This is used to handle deletions and removals, because in case we dont have the object, we can
|
|
||||||
/// simply ignore the activity.
|
|
||||||
pub(crate) async fn find_by_id(
|
|
||||||
context: &LemmyContext,
|
|
||||||
apub_id: Url,
|
|
||||||
) -> Result<FindResults, LemmyError> {
|
|
||||||
let ap_id = apub_id.to_string();
|
|
||||||
let community = blocking(context.pool(), move |conn| {
|
|
||||||
Community::read_from_actor_id(conn, &ap_id)
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
if let Ok(c) = community {
|
|
||||||
return Ok(FindResults::Community(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
let ap_id = apub_id.to_string();
|
|
||||||
let post = blocking(context.pool(), move |conn| {
|
|
||||||
Post::read_from_apub_id(conn, &ap_id)
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
if let Ok(p) = post {
|
|
||||||
return Ok(FindResults::Post(p));
|
|
||||||
}
|
|
||||||
|
|
||||||
let ap_id = apub_id.to_string();
|
|
||||||
let comment = blocking(context.pool(), move |conn| {
|
|
||||||
Comment::read_from_apub_id(conn, &ap_id)
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
if let Ok(c) = comment {
|
|
||||||
return Ok(FindResults::Comment(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
return Err(NotFound.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ensure that the ID of an incoming activity comes from the same domain as the actor. Optionally
|
/// Ensure that the ID of an incoming activity comes from the same domain as the actor. Optionally
|
||||||
/// also checks the ID of the inner object.
|
/// also checks the ID of the inner object.
|
||||||
///
|
///
|
||||||
|
@ -131,7 +49,7 @@ pub(crate) async fn find_by_id(
|
||||||
/// HTTP signature.
|
/// HTTP signature.
|
||||||
pub(crate) fn verify_activity_domains_valid<T, Kind>(
|
pub(crate) fn verify_activity_domains_valid<T, Kind>(
|
||||||
activity: &T,
|
activity: &T,
|
||||||
actor_id: Url,
|
actor_id: &Url,
|
||||||
object_domain_must_match: bool,
|
object_domain_must_match: bool,
|
||||||
) -> Result<(), LemmyError>
|
) -> Result<(), LemmyError>
|
||||||
where
|
where
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::{announce_if_community_is_local, get_actor_as_user},
|
activities::receive::get_actor_as_user,
|
||||||
fetcher::get_or_fetch_and_insert_post,
|
fetcher::get_or_fetch_and_insert_post,
|
||||||
ActorType,
|
ActorType,
|
||||||
FromApub,
|
FromApub,
|
||||||
PageExt,
|
PageExt,
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
activity::{Create, Delete, Dislike, Like, Remove, Update},
|
activity::{Create, Dislike, Like, Remove, Update},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
use actix_web::HttpResponse;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use lemmy_db::{
|
use lemmy_db::{
|
||||||
post::{Post, PostForm, PostLike, PostLikeForm},
|
post::{Post, PostForm, PostLike, PostLikeForm},
|
||||||
|
@ -25,7 +24,7 @@ pub(crate) async fn receive_create_post(
|
||||||
create: Create,
|
create: Create,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(&create, context, request_counter).await?;
|
let user = get_actor_as_user(&create, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -51,15 +50,14 @@ pub(crate) async fn receive_create_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(create, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_update_post(
|
pub(crate) async fn receive_update_post(
|
||||||
update: Update,
|
update: Update,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(&update, context, request_counter).await?;
|
let user = get_actor_as_user(&update, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -89,15 +87,14 @@ pub(crate) async fn receive_update_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(update, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_like_post(
|
pub(crate) async fn receive_like_post(
|
||||||
like: Like,
|
like: Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(&like, context, request_counter).await?;
|
let user = get_actor_as_user(&like, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -134,15 +131,14 @@ pub(crate) async fn receive_like_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(like, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_dislike_post(
|
pub(crate) async fn receive_dislike_post(
|
||||||
dislike: Dislike,
|
dislike: Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(&dislike, context, request_counter).await?;
|
let user = get_actor_as_user(&dislike, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(
|
let page = PageExt::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
|
@ -185,16 +181,13 @@ pub(crate) async fn receive_dislike_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(dislike, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_delete_post(
|
pub(crate) async fn receive_delete_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
delete: Delete,
|
|
||||||
post: Post,
|
post: Post,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let deleted_post = blocking(context.pool(), move |conn| {
|
let deleted_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_deleted(conn, post.id, true)
|
Post::update_deleted(conn, post.id, true)
|
||||||
})
|
})
|
||||||
|
@ -214,15 +207,14 @@ pub(crate) async fn receive_delete_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(delete, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_remove_post(
|
pub(crate) async fn receive_remove_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
_remove: Remove,
|
_remove: Remove,
|
||||||
post: Post,
|
post: Post,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let removed_post = blocking(context.pool(), move |conn| {
|
let removed_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_removed(conn, post.id, true)
|
Post::update_removed(conn, post.id, true)
|
||||||
})
|
})
|
||||||
|
@ -242,5 +234,5 @@ pub(crate) async fn receive_remove_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::{announce_if_community_is_local, get_actor_as_user},
|
activities::receive::get_actor_as_user,
|
||||||
fetcher::get_or_fetch_and_insert_post,
|
fetcher::get_or_fetch_and_insert_post,
|
||||||
FromApub,
|
FromApub,
|
||||||
PageExt,
|
PageExt,
|
||||||
};
|
};
|
||||||
use activitystreams::{activity::*, prelude::*};
|
use activitystreams::{activity::*, prelude::*};
|
||||||
use actix_web::HttpResponse;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use lemmy_db::{
|
use lemmy_db::{
|
||||||
post::{Post, PostForm, PostLike},
|
post::{Post, PostForm, PostLike},
|
||||||
|
@ -17,11 +16,10 @@ use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
|
use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_like_post(
|
pub(crate) async fn receive_undo_like_post(
|
||||||
undo: Undo,
|
|
||||||
like: &Like,
|
like: &Like,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(like, context, request_counter).await?;
|
let user = get_actor_as_user(like, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
@ -52,16 +50,14 @@ pub(crate) async fn receive_undo_like_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_dislike_post(
|
pub(crate) async fn receive_undo_dislike_post(
|
||||||
undo: Undo,
|
|
||||||
dislike: &Dislike,
|
dislike: &Dislike,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
let user = get_actor_as_user(dislike, context, request_counter).await?;
|
||||||
let page = PageExt::from_any_base(
|
let page = PageExt::from_any_base(
|
||||||
dislike
|
dislike
|
||||||
|
@ -98,16 +94,13 @@ pub(crate) async fn receive_undo_dislike_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_delete_post(
|
pub(crate) async fn receive_undo_delete_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
|
||||||
post: Post,
|
post: Post,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let deleted_post = blocking(context.pool(), move |conn| {
|
let deleted_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_deleted(conn, post.id, false)
|
Post::update_deleted(conn, post.id, false)
|
||||||
})
|
})
|
||||||
|
@ -127,16 +120,13 @@ pub(crate) async fn receive_undo_delete_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn receive_undo_remove_post(
|
pub(crate) async fn receive_undo_remove_post(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
undo: Undo,
|
|
||||||
post: Post,
|
post: Post,
|
||||||
request_counter: &mut i32,
|
) -> Result<(), LemmyError> {
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let removed_post = blocking(context.pool(), move |conn| {
|
let removed_post = blocking(context.pool(), move |conn| {
|
||||||
Post::update_removed(conn, post.id, false)
|
Post::update_removed(conn, post.id, false)
|
||||||
})
|
})
|
||||||
|
@ -157,6 +147,5 @@ pub(crate) async fn receive_undo_remove_post(
|
||||||
websocket_id: None,
|
websocket_id: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
announce_if_community_is_local(undo, context, request_counter).await?;
|
Ok(())
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
217
lemmy_apub/src/activities/receive/private_message.rs
Normal file
217
lemmy_apub/src/activities/receive/private_message.rs
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
use crate::{
|
||||||
|
activities::receive::verify_activity_domains_valid,
|
||||||
|
check_is_apub_id_valid,
|
||||||
|
fetcher::get_or_fetch_and_upsert_user,
|
||||||
|
inbox::get_activity_to_and_cc,
|
||||||
|
FromApub,
|
||||||
|
};
|
||||||
|
use activitystreams::{
|
||||||
|
activity::{ActorAndObjectRefExt, Create, Delete, Undo, Update},
|
||||||
|
base::{AnyBase, AsBase, ExtendsExt},
|
||||||
|
object::{AsObject, Note},
|
||||||
|
public,
|
||||||
|
};
|
||||||
|
use anyhow::{anyhow, Context};
|
||||||
|
use lemmy_db::{
|
||||||
|
private_message::{PrivateMessage, PrivateMessageForm},
|
||||||
|
private_message_view::PrivateMessageView,
|
||||||
|
Crud,
|
||||||
|
};
|
||||||
|
use lemmy_structs::{blocking, user::PrivateMessageResponse};
|
||||||
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
|
use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
pub(crate) async fn receive_create_private_message(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&create, &expected_domain, true)?;
|
||||||
|
check_private_message_activity_valid(&create, context, request_counter).await?;
|
||||||
|
|
||||||
|
let note = Note::from_any_base(
|
||||||
|
create
|
||||||
|
.object()
|
||||||
|
.as_one()
|
||||||
|
.context(location_info!())?
|
||||||
|
.to_owned(),
|
||||||
|
)?
|
||||||
|
.context(location_info!())?;
|
||||||
|
|
||||||
|
let private_message =
|
||||||
|
PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?;
|
||||||
|
|
||||||
|
let inserted_private_message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessage::create(conn, &private_message)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessageView::read(conn, inserted_private_message.id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let res = PrivateMessageResponse { message };
|
||||||
|
|
||||||
|
let recipient_id = res.message.recipient_id;
|
||||||
|
|
||||||
|
context.chat_server().do_send(SendUserRoomMessage {
|
||||||
|
op: UserOperation::CreatePrivateMessage,
|
||||||
|
response: res,
|
||||||
|
recipient_id,
|
||||||
|
websocket_id: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn receive_update_private_message(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&update, &expected_domain, true)?;
|
||||||
|
check_private_message_activity_valid(&update, context, request_counter).await?;
|
||||||
|
|
||||||
|
let object = update
|
||||||
|
.object()
|
||||||
|
.as_one()
|
||||||
|
.context(location_info!())?
|
||||||
|
.to_owned();
|
||||||
|
let note = Note::from_any_base(object)?.context(location_info!())?;
|
||||||
|
|
||||||
|
let private_message_form =
|
||||||
|
PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?;
|
||||||
|
|
||||||
|
let private_message_ap_id = private_message_form
|
||||||
|
.ap_id
|
||||||
|
.as_ref()
|
||||||
|
.context(location_info!())?
|
||||||
|
.clone();
|
||||||
|
let private_message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let private_message_id = private_message.id;
|
||||||
|
blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessage::update(conn, private_message_id, &private_message_form)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let private_message_id = private_message.id;
|
||||||
|
let message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessageView::read(conn, private_message_id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let res = PrivateMessageResponse { message };
|
||||||
|
|
||||||
|
let recipient_id = res.message.recipient_id;
|
||||||
|
|
||||||
|
context.chat_server().do_send(SendUserRoomMessage {
|
||||||
|
op: UserOperation::EditPrivateMessage,
|
||||||
|
response: res,
|
||||||
|
recipient_id,
|
||||||
|
websocket_id: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn receive_delete_private_message(
|
||||||
|
context: &LemmyContext,
|
||||||
|
delete: Delete,
|
||||||
|
private_message: PrivateMessage,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
check_private_message_activity_valid(&delete, context, request_counter).await?;
|
||||||
|
|
||||||
|
let deleted_private_message = blocking(context.pool(), move |conn| {
|
||||||
|
PrivateMessage::update_deleted(conn, private_message.id, true)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessageView::read(&conn, deleted_private_message.id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let res = PrivateMessageResponse { message };
|
||||||
|
let recipient_id = res.message.recipient_id;
|
||||||
|
context.chat_server().do_send(SendUserRoomMessage {
|
||||||
|
op: UserOperation::EditPrivateMessage,
|
||||||
|
response: res,
|
||||||
|
recipient_id,
|
||||||
|
websocket_id: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn receive_undo_delete_private_message(
|
||||||
|
context: &LemmyContext,
|
||||||
|
undo: Undo,
|
||||||
|
expected_domain: &Url,
|
||||||
|
private_message: PrivateMessage,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
check_private_message_activity_valid(&undo, context, request_counter).await?;
|
||||||
|
let object = undo.object().to_owned().one().context(location_info!())?;
|
||||||
|
let delete = Delete::from_any_base(object)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
||||||
|
check_private_message_activity_valid(&delete, context, request_counter).await?;
|
||||||
|
|
||||||
|
let deleted_private_message = blocking(context.pool(), move |conn| {
|
||||||
|
PrivateMessage::update_deleted(conn, private_message.id, false)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let message = blocking(&context.pool(), move |conn| {
|
||||||
|
PrivateMessageView::read(&conn, deleted_private_message.id)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let res = PrivateMessageResponse { message };
|
||||||
|
let recipient_id = res.message.recipient_id;
|
||||||
|
context.chat_server().do_send(SendUserRoomMessage {
|
||||||
|
op: UserOperation::EditPrivateMessage,
|
||||||
|
response: res,
|
||||||
|
recipient_id,
|
||||||
|
websocket_id: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn check_private_message_activity_valid<T, Kind>(
|
||||||
|
activity: &T,
|
||||||
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError>
|
||||||
|
where
|
||||||
|
T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
|
||||||
|
{
|
||||||
|
let to_and_cc = get_activity_to_and_cc(activity)?;
|
||||||
|
if to_and_cc.len() != 1 {
|
||||||
|
return Err(anyhow!("Private message can only be addressed to one user").into());
|
||||||
|
}
|
||||||
|
if to_and_cc.contains(&public()) {
|
||||||
|
return Err(anyhow!("Private message cant be public").into());
|
||||||
|
}
|
||||||
|
let user_id = activity
|
||||||
|
.actor()?
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
check_is_apub_id_valid(&user_id)?;
|
||||||
|
// check that the sender is a user, not a community
|
||||||
|
get_or_fetch_and_upsert_user(&user_id, &context, request_counter).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,14 +1,25 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::verify_activity_domains_valid,
|
activities::receive::verify_activity_domains_valid,
|
||||||
check_is_apub_id_valid,
|
inbox::{
|
||||||
extensions::signatures::verify_signature,
|
get_activity_id,
|
||||||
fetcher::get_or_fetch_and_upsert_user,
|
get_activity_to_and_cc,
|
||||||
inbox::{get_activity_id, is_activity_already_known},
|
inbox_verify_http_signature,
|
||||||
|
is_activity_already_known,
|
||||||
|
is_addressed_to_public,
|
||||||
|
receive_for_community::{
|
||||||
|
receive_create_for_community,
|
||||||
|
receive_delete_for_community,
|
||||||
|
receive_dislike_for_community,
|
||||||
|
receive_like_for_community,
|
||||||
|
receive_undo_for_community,
|
||||||
|
receive_update_for_community,
|
||||||
|
},
|
||||||
|
},
|
||||||
insert_activity,
|
insert_activity,
|
||||||
ActorType,
|
ActorType,
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
activity::{ActorAndObject, Follow, Undo},
|
activity::{kind::FollowType, ActorAndObject, Follow, Undo},
|
||||||
base::AnyBase,
|
base::AnyBase,
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
|
@ -25,89 +36,153 @@ use lemmy_websocket::LemmyContext;
|
||||||
use log::info;
|
use log::info;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
/// Allowed activities for community inbox.
|
/// Allowed activities for community inbox.
|
||||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub enum ValidTypes {
|
pub enum CommunityValidTypes {
|
||||||
Follow,
|
Follow, // follow request from a user
|
||||||
Undo,
|
Undo, // unfollow from a user
|
||||||
|
Create, // create post or comment
|
||||||
|
Update, // update post or comment
|
||||||
|
Like, // upvote post or comment
|
||||||
|
Dislike, // downvote post or comment
|
||||||
|
Delete, // post or comment deleted by creator
|
||||||
|
Remove, // post or comment removed by mod or admin
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
pub type CommunityAcceptedActivities = ActorAndObject<CommunityValidTypes>;
|
||||||
|
|
||||||
/// Handler for all incoming receive to community inboxes.
|
/// Handler for all incoming receive to community inboxes.
|
||||||
pub async fn community_inbox(
|
pub async fn community_inbox(
|
||||||
request: HttpRequest,
|
request: HttpRequest,
|
||||||
input: web::Json<AcceptedActivities>,
|
input: web::Json<CommunityAcceptedActivities>,
|
||||||
path: web::Path<String>,
|
path: web::Path<String>,
|
||||||
context: web::Data<LemmyContext>,
|
context: web::Data<LemmyContext>,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let activity = input.into_inner();
|
let activity = input.into_inner();
|
||||||
|
// First of all check the http signature
|
||||||
|
let request_counter = &mut 0;
|
||||||
|
let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
|
||||||
|
|
||||||
|
// Do nothing if we received the same activity before
|
||||||
|
let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
|
||||||
|
if is_activity_already_known(context.pool(), &activity_id).await? {
|
||||||
|
return Ok(HttpResponse::Ok().finish());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the activity is actually meant for us
|
||||||
let path = path.into_inner();
|
let path = path.into_inner();
|
||||||
let community = blocking(&context.pool(), move |conn| {
|
let community = blocking(&context.pool(), move |conn| {
|
||||||
Community::read_from_name(&conn, &path)
|
Community::read_from_name(&conn, &path)
|
||||||
})
|
})
|
||||||
.await??;
|
.await??;
|
||||||
|
let to_and_cc = get_activity_to_and_cc(&activity)?;
|
||||||
let to = activity
|
if !to_and_cc.contains(&&community.actor_id()?) {
|
||||||
.to()
|
|
||||||
.context(location_info!())?
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri();
|
|
||||||
if Some(community.actor_id()?) != to {
|
|
||||||
return Err(anyhow!("Activity delivered to wrong community").into());
|
return Err(anyhow!("Activity delivered to wrong community").into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Community {} received activity {:?}",
|
"Community {} received activity {:?} from {}",
|
||||||
&community.name, &activity
|
|
||||||
);
|
|
||||||
let user_uri = activity
|
|
||||||
.actor()?
|
|
||||||
.as_single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
info!(
|
|
||||||
"Community {} inbox received activity {:?} from {}",
|
|
||||||
community.name,
|
community.name,
|
||||||
&activity.id_unchecked(),
|
&activity.id_unchecked(),
|
||||||
&user_uri
|
&actor.actor_id_str()
|
||||||
);
|
);
|
||||||
check_is_apub_id_valid(user_uri)?;
|
|
||||||
|
|
||||||
let request_counter = &mut 0;
|
community_receive_message(
|
||||||
let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
|
activity.clone(),
|
||||||
|
community.clone(),
|
||||||
|
actor.as_ref(),
|
||||||
|
&context,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
verify_signature(&request, &user)?;
|
/// Receives Follow, Undo/Follow, post actions, comment actions (including votes)
|
||||||
|
pub(crate) async fn community_receive_message(
|
||||||
let activity_id = get_activity_id(&activity, user_uri)?;
|
activity: CommunityAcceptedActivities,
|
||||||
if is_activity_already_known(context.pool(), &activity_id).await? {
|
to_community: Community,
|
||||||
return Ok(HttpResponse::Ok().finish());
|
actor: &dyn ActorType,
|
||||||
}
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
|
// TODO: check if the sending user is banned by the community
|
||||||
|
|
||||||
let any_base = activity.clone().into_any_base()?;
|
let any_base = activity.clone().into_any_base()?;
|
||||||
let kind = activity.kind().context(location_info!())?;
|
let actor_url = actor.actor_id()?;
|
||||||
let res = match kind {
|
let activity_kind = activity.kind().context(location_info!())?;
|
||||||
ValidTypes::Follow => handle_follow(any_base, user, community, &context).await,
|
let do_announce = match activity_kind {
|
||||||
ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
|
CommunityValidTypes::Follow => {
|
||||||
|
handle_follow(any_base.clone(), actor_url, &to_community, &context).await?;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Undo => {
|
||||||
|
handle_undo(
|
||||||
|
context,
|
||||||
|
activity.clone(),
|
||||||
|
actor_url,
|
||||||
|
&to_community,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Create => {
|
||||||
|
receive_create_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Update => {
|
||||||
|
receive_update_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Like => {
|
||||||
|
receive_like_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Dislike => {
|
||||||
|
receive_dislike_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Delete => {
|
||||||
|
receive_delete_for_community(context, any_base.clone(), &actor_url).await?;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CommunityValidTypes::Remove => {
|
||||||
|
// TODO: we dont support remote mods, so this is ignored for now
|
||||||
|
//receive_remove_for_community(context, any_base.clone(), &user_url).await?
|
||||||
|
false
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
if do_announce {
|
||||||
res
|
// Check again that the activity is public, just to be sure
|
||||||
|
is_addressed_to_public(&activity)?;
|
||||||
|
to_community
|
||||||
|
.send_announce(activity.into_any_base()?, context)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(HttpResponse::Ok().finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a follow request from a remote user, adding the user as follower and returning an
|
/// Handle a follow request from a remote user, adding the user as follower and returning an
|
||||||
/// Accept activity.
|
/// Accept activity.
|
||||||
async fn handle_follow(
|
async fn handle_follow(
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
user: User_,
|
user_url: Url,
|
||||||
community: Community,
|
community: &Community,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let follow = Follow::from_any_base(activity)?.context(location_info!())?;
|
let follow = Follow::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
|
verify_activity_domains_valid(&follow, &user_url, false)?;
|
||||||
|
|
||||||
|
let user = blocking(&context.pool(), move |conn| {
|
||||||
|
User_::read_from_actor_id(&conn, user_url.as_str())
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
let community_follower_form = CommunityFollowerForm {
|
let community_follower_form = CommunityFollowerForm {
|
||||||
community_id: community.id,
|
community_id: community.id,
|
||||||
user_id: user.id,
|
user_id: user.id,
|
||||||
|
@ -124,20 +199,44 @@ async fn handle_follow(
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_undo(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: CommunityAcceptedActivities,
|
||||||
|
actor_url: Url,
|
||||||
|
to_community: &Community,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<bool, LemmyError> {
|
||||||
|
let inner_kind = activity
|
||||||
|
.object()
|
||||||
|
.is_single_kind(&FollowType::Follow.to_string());
|
||||||
|
let any_base = activity.into_any_base()?;
|
||||||
|
if inner_kind {
|
||||||
|
handle_undo_follow(any_base, actor_url, to_community, &context).await?;
|
||||||
|
Ok(false)
|
||||||
|
} else {
|
||||||
|
receive_undo_for_community(context, any_base, &actor_url, request_counter).await?;
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle `Undo/Follow` from a user, removing the user from followers list.
|
/// Handle `Undo/Follow` from a user, removing the user from followers list.
|
||||||
async fn handle_undo_follow(
|
async fn handle_undo_follow(
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
user: User_,
|
user_url: Url,
|
||||||
community: Community,
|
community: &Community,
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&undo, user.actor_id()?, true)?;
|
verify_activity_domains_valid(&undo, &user_url, true)?;
|
||||||
|
|
||||||
let object = undo.object().to_owned().one().context(location_info!())?;
|
let object = undo.object().to_owned().one().context(location_info!())?;
|
||||||
let follow = Follow::from_any_base(object)?.context(location_info!())?;
|
let follow = Follow::from_any_base(object)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
|
verify_activity_domains_valid(&follow, &user_url, false)?;
|
||||||
|
|
||||||
|
let user = blocking(&context.pool(), move |conn| {
|
||||||
|
User_::read_from_actor_id(&conn, user_url.as_str())
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
let community_follower_form = CommunityFollowerForm {
|
let community_follower_form = CommunityFollowerForm {
|
||||||
community_id: community.id,
|
community_id: community.id,
|
||||||
user_id: user.id,
|
user_id: user.id,
|
||||||
|
@ -149,5 +248,5 @@ async fn handle_undo_follow(
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,26 @@
|
||||||
use activitystreams::base::{BaseExt, Extends};
|
use crate::{
|
||||||
use anyhow::Context;
|
check_is_apub_id_valid,
|
||||||
|
extensions::signatures::verify_signature,
|
||||||
|
fetcher::get_or_fetch_and_upsert_actor,
|
||||||
|
ActorType,
|
||||||
|
};
|
||||||
|
use activitystreams::{
|
||||||
|
activity::ActorAndObjectRefExt,
|
||||||
|
base::{AsBase, BaseExt, Extends},
|
||||||
|
object::{AsObject, ObjectExt},
|
||||||
|
public,
|
||||||
|
};
|
||||||
|
use actix_web::HttpRequest;
|
||||||
|
use anyhow::{anyhow, Context};
|
||||||
use lemmy_db::{activity::Activity, DbPool};
|
use lemmy_db::{activity::Activity, DbPool};
|
||||||
use lemmy_structs::blocking;
|
use lemmy_structs::blocking;
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
|
use lemmy_websocket::LemmyContext;
|
||||||
use serde::{export::fmt::Debug, Serialize};
|
use serde::{export::fmt::Debug, Serialize};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
pub mod community_inbox;
|
pub mod community_inbox;
|
||||||
|
mod receive_for_community;
|
||||||
pub mod shared_inbox;
|
pub mod shared_inbox;
|
||||||
pub mod user_inbox;
|
pub mod user_inbox;
|
||||||
|
|
||||||
|
@ -35,3 +49,65 @@ pub(crate) async fn is_activity_already_known(
|
||||||
Err(_) => Ok(false),
|
Err(_) => Ok(false),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_activity_to_and_cc<T, Kind>(activity: &T) -> Result<Vec<Url>, LemmyError>
|
||||||
|
where
|
||||||
|
T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
|
||||||
|
{
|
||||||
|
let mut to_and_cc = vec![];
|
||||||
|
if let Some(to) = activity.to() {
|
||||||
|
let to = to.to_owned().unwrap_to_vec();
|
||||||
|
let mut to = to
|
||||||
|
.iter()
|
||||||
|
.map(|t| t.as_xsd_any_uri())
|
||||||
|
.flatten()
|
||||||
|
.map(|t| t.to_owned())
|
||||||
|
.collect();
|
||||||
|
to_and_cc.append(&mut to);
|
||||||
|
}
|
||||||
|
if let Some(cc) = activity.cc() {
|
||||||
|
let cc = cc.to_owned().unwrap_to_vec();
|
||||||
|
let mut cc = cc
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.as_xsd_any_uri())
|
||||||
|
.flatten()
|
||||||
|
.map(|c| c.to_owned())
|
||||||
|
.collect();
|
||||||
|
to_and_cc.append(&mut cc);
|
||||||
|
}
|
||||||
|
Ok(to_and_cc)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_addressed_to_public<T, Kind>(activity: &T) -> Result<(), LemmyError>
|
||||||
|
where
|
||||||
|
T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
|
||||||
|
{
|
||||||
|
let to_and_cc = get_activity_to_and_cc(activity)?;
|
||||||
|
if to_and_cc.contains(&public()) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(anyhow!("Activity is not addressed to public").into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn inbox_verify_http_signature<T, Kind>(
|
||||||
|
activity: &T,
|
||||||
|
context: &LemmyContext,
|
||||||
|
request: HttpRequest,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<Box<dyn ActorType>, LemmyError>
|
||||||
|
where
|
||||||
|
T: AsObject<Kind> + ActorAndObjectRefExt + Extends<Kind> + AsBase<Kind>,
|
||||||
|
Kind: Serialize,
|
||||||
|
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let actor_id = activity
|
||||||
|
.actor()?
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
check_is_apub_id_valid(&actor_id)?;
|
||||||
|
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
|
||||||
|
verify_signature(&request, actor.as_ref())?;
|
||||||
|
Ok(actor)
|
||||||
|
}
|
||||||
|
|
345
lemmy_apub/src/inbox/receive_for_community.rs
Normal file
345
lemmy_apub/src/inbox/receive_for_community.rs
Normal file
|
@ -0,0 +1,345 @@
|
||||||
|
use crate::{
|
||||||
|
activities::receive::{
|
||||||
|
comment::{
|
||||||
|
receive_create_comment,
|
||||||
|
receive_delete_comment,
|
||||||
|
receive_dislike_comment,
|
||||||
|
receive_like_comment,
|
||||||
|
receive_remove_comment,
|
||||||
|
receive_update_comment,
|
||||||
|
},
|
||||||
|
comment_undo::{
|
||||||
|
receive_undo_delete_comment,
|
||||||
|
receive_undo_dislike_comment,
|
||||||
|
receive_undo_like_comment,
|
||||||
|
receive_undo_remove_comment,
|
||||||
|
},
|
||||||
|
post::{
|
||||||
|
receive_create_post,
|
||||||
|
receive_delete_post,
|
||||||
|
receive_dislike_post,
|
||||||
|
receive_like_post,
|
||||||
|
receive_remove_post,
|
||||||
|
receive_update_post,
|
||||||
|
},
|
||||||
|
post_undo::{
|
||||||
|
receive_undo_delete_post,
|
||||||
|
receive_undo_dislike_post,
|
||||||
|
receive_undo_like_post,
|
||||||
|
receive_undo_remove_post,
|
||||||
|
},
|
||||||
|
receive_unhandled_activity,
|
||||||
|
verify_activity_domains_valid,
|
||||||
|
},
|
||||||
|
inbox::is_addressed_to_public,
|
||||||
|
};
|
||||||
|
use activitystreams::{
|
||||||
|
activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
|
||||||
|
base::AnyBase,
|
||||||
|
prelude::*,
|
||||||
|
};
|
||||||
|
use anyhow::Context;
|
||||||
|
use diesel::result::Error::NotFound;
|
||||||
|
use lemmy_db::{comment::Comment, post::Post, site::Site, Crud};
|
||||||
|
use lemmy_structs::blocking;
|
||||||
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
|
use lemmy_websocket::LemmyContext;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
/// This file is for post/comment activities received by the community, and for post/comment
|
||||||
|
/// activities announced by the community and received by the user.
|
||||||
|
|
||||||
|
/// A post or comment being created
|
||||||
|
pub(in crate::inbox) async fn receive_create_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&create, &expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&create)?;
|
||||||
|
|
||||||
|
match create.object().as_single_kind_str() {
|
||||||
|
Some("Page") => receive_create_post(create, context, request_counter).await,
|
||||||
|
Some("Note") => receive_create_comment(create, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(create),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment being edited
|
||||||
|
pub(in crate::inbox) async fn receive_update_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&update, &expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&update)?;
|
||||||
|
|
||||||
|
match update.object().as_single_kind_str() {
|
||||||
|
Some("Page") => receive_update_post(update, context, request_counter).await,
|
||||||
|
Some("Note") => receive_update_comment(update, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(update),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment being upvoted
|
||||||
|
pub(in crate::inbox) async fn receive_like_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let like = Like::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&like, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&like)?;
|
||||||
|
|
||||||
|
match like.object().as_single_kind_str() {
|
||||||
|
Some("Page") => receive_like_post(like, context, request_counter).await,
|
||||||
|
Some("Note") => receive_like_comment(like, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(like),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment being downvoted
|
||||||
|
pub(in crate::inbox) async fn receive_dislike_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let enable_downvotes = blocking(context.pool(), move |conn| {
|
||||||
|
Site::read(conn, 1).map(|s| s.enable_downvotes)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
if !enable_downvotes {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&dislike, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&dislike)?;
|
||||||
|
|
||||||
|
match dislike.object().as_single_kind_str() {
|
||||||
|
Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
|
||||||
|
Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(dislike),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment being deleted by its creator
|
||||||
|
pub(in crate::inbox) async fn receive_delete_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
dbg!("receive_delete_for_community");
|
||||||
|
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&delete, &expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&delete)?;
|
||||||
|
|
||||||
|
let object = delete
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
|
||||||
|
match find_post_or_comment_by_id(context, object).await {
|
||||||
|
Ok(PostOrComment::Post(p)) => receive_delete_post(context, p).await,
|
||||||
|
Ok(PostOrComment::Comment(c)) => receive_delete_comment(context, c).await,
|
||||||
|
// if we dont have the object, no need to do anything
|
||||||
|
Err(_) => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment being removed by a mod/admin
|
||||||
|
pub(in crate::inbox) async fn receive_remove_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
dbg!("receive_remove_for_community");
|
||||||
|
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&remove, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&remove)?;
|
||||||
|
|
||||||
|
let cc = remove
|
||||||
|
.cc()
|
||||||
|
.map(|c| c.as_many())
|
||||||
|
.flatten()
|
||||||
|
.context(location_info!())?;
|
||||||
|
let community_id = cc
|
||||||
|
.first()
|
||||||
|
.map(|c| c.as_xsd_any_uri())
|
||||||
|
.flatten()
|
||||||
|
.context(location_info!())?;
|
||||||
|
|
||||||
|
let object = remove
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
|
||||||
|
// Ensure that remove activity comes from the same domain as the community
|
||||||
|
remove.id(community_id.domain().context(location_info!())?)?;
|
||||||
|
|
||||||
|
match find_post_or_comment_by_id(context, object).await {
|
||||||
|
Ok(PostOrComment::Post(p)) => receive_remove_post(context, remove, p).await,
|
||||||
|
Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, remove, c).await,
|
||||||
|
// if we dont have the object, no need to do anything
|
||||||
|
Err(_) => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post/comment action being reverted (either a delete, remove, upvote or downvote)
|
||||||
|
pub(in crate::inbox) async fn receive_undo_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
activity: AnyBase,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&undo, &expected_domain.to_owned(), true)?;
|
||||||
|
is_addressed_to_public(&undo)?;
|
||||||
|
|
||||||
|
match undo.object().as_single_kind_str() {
|
||||||
|
Some("Delete") => receive_undo_delete_for_community(context, undo, expected_domain).await,
|
||||||
|
Some("Remove") => receive_undo_remove_for_community(context, undo, expected_domain).await,
|
||||||
|
Some("Like") => {
|
||||||
|
receive_undo_like_for_community(context, undo, expected_domain, request_counter).await
|
||||||
|
}
|
||||||
|
Some("Dislike") => {
|
||||||
|
receive_undo_dislike_for_community(context, undo, expected_domain, request_counter).await
|
||||||
|
}
|
||||||
|
_ => receive_unhandled_activity(undo),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment deletion being reverted
|
||||||
|
pub(in crate::inbox) async fn receive_undo_delete_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
undo: Undo,
|
||||||
|
expected_domain: &Url,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
|
.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&delete, &expected_domain, true)?;
|
||||||
|
is_addressed_to_public(&delete)?;
|
||||||
|
|
||||||
|
let object = delete
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
match find_post_or_comment_by_id(context, object).await {
|
||||||
|
Ok(PostOrComment::Post(p)) => receive_undo_delete_post(context, p).await,
|
||||||
|
Ok(PostOrComment::Comment(c)) => receive_undo_delete_comment(context, c).await,
|
||||||
|
// if we dont have the object, no need to do anything
|
||||||
|
Err(_) => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment removal being reverted
|
||||||
|
pub(in crate::inbox) async fn receive_undo_remove_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
undo: Undo,
|
||||||
|
expected_domain: &Url,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
|
.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&remove, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&remove)?;
|
||||||
|
|
||||||
|
let object = remove
|
||||||
|
.object()
|
||||||
|
.to_owned()
|
||||||
|
.single_xsd_any_uri()
|
||||||
|
.context(location_info!())?;
|
||||||
|
match find_post_or_comment_by_id(context, object).await {
|
||||||
|
Ok(PostOrComment::Post(p)) => receive_undo_remove_post(context, p).await,
|
||||||
|
Ok(PostOrComment::Comment(c)) => receive_undo_remove_comment(context, c).await,
|
||||||
|
// if we dont have the object, no need to do anything
|
||||||
|
Err(_) => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment upvote being reverted
|
||||||
|
pub(in crate::inbox) async fn receive_undo_like_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
undo: Undo,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
|
.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&like, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&like)?;
|
||||||
|
|
||||||
|
let type_ = like
|
||||||
|
.object()
|
||||||
|
.as_single_kind_str()
|
||||||
|
.context(location_info!())?;
|
||||||
|
match type_ {
|
||||||
|
"Note" => receive_undo_like_comment(&like, context, request_counter).await,
|
||||||
|
"Page" => receive_undo_like_post(&like, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(like),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A post or comment downvote being reverted
|
||||||
|
pub(in crate::inbox) async fn receive_undo_dislike_for_community(
|
||||||
|
context: &LemmyContext,
|
||||||
|
undo: Undo,
|
||||||
|
expected_domain: &Url,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
|
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
||||||
|
.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&dislike, &expected_domain, false)?;
|
||||||
|
is_addressed_to_public(&dislike)?;
|
||||||
|
|
||||||
|
let type_ = dislike
|
||||||
|
.object()
|
||||||
|
.as_single_kind_str()
|
||||||
|
.context(location_info!())?;
|
||||||
|
match type_ {
|
||||||
|
"Note" => receive_undo_dislike_comment(&dislike, context, request_counter).await,
|
||||||
|
"Page" => receive_undo_dislike_post(&dislike, context, request_counter).await,
|
||||||
|
_ => receive_unhandled_activity(dislike),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum PostOrComment {
|
||||||
|
Comment(Comment),
|
||||||
|
Post(Post),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tries to find a post or comment in the local database, without any network requests.
|
||||||
|
/// This is used to handle deletions and removals, because in case we dont have the object, we can
|
||||||
|
/// simply ignore the activity.
|
||||||
|
async fn find_post_or_comment_by_id(
|
||||||
|
context: &LemmyContext,
|
||||||
|
apub_id: Url,
|
||||||
|
) -> Result<PostOrComment, LemmyError> {
|
||||||
|
let ap_id = apub_id.to_string();
|
||||||
|
let post = blocking(context.pool(), move |conn| {
|
||||||
|
Post::read_from_apub_id(conn, &ap_id)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
if let Ok(p) = post {
|
||||||
|
return Ok(PostOrComment::Post(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
let ap_id = apub_id.to_string();
|
||||||
|
let comment = blocking(context.pool(), move |conn| {
|
||||||
|
Comment::read_from_apub_id(conn, &ap_id)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
if let Ok(c) = comment {
|
||||||
|
return Ok(PostOrComment::Comment(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(NotFound.into());
|
||||||
|
}
|
|
@ -1,63 +1,21 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::{
|
inbox::{
|
||||||
comment::{
|
community_inbox::{community_receive_message, CommunityAcceptedActivities},
|
||||||
receive_create_comment,
|
get_activity_id,
|
||||||
receive_delete_comment,
|
get_activity_to_and_cc,
|
||||||
receive_dislike_comment,
|
inbox_verify_http_signature,
|
||||||
receive_like_comment,
|
is_activity_already_known,
|
||||||
receive_remove_comment,
|
user_inbox::{user_receive_message, UserAcceptedActivities},
|
||||||
receive_update_comment,
|
|
||||||
},
|
},
|
||||||
comment_undo::{
|
|
||||||
receive_undo_delete_comment,
|
|
||||||
receive_undo_dislike_comment,
|
|
||||||
receive_undo_like_comment,
|
|
||||||
receive_undo_remove_comment,
|
|
||||||
},
|
|
||||||
community::{
|
|
||||||
receive_delete_community,
|
|
||||||
receive_remove_community,
|
|
||||||
receive_undo_delete_community,
|
|
||||||
receive_undo_remove_community,
|
|
||||||
},
|
|
||||||
find_by_id,
|
|
||||||
post::{
|
|
||||||
receive_create_post,
|
|
||||||
receive_delete_post,
|
|
||||||
receive_dislike_post,
|
|
||||||
receive_like_post,
|
|
||||||
receive_remove_post,
|
|
||||||
receive_update_post,
|
|
||||||
},
|
|
||||||
post_undo::{
|
|
||||||
receive_undo_delete_post,
|
|
||||||
receive_undo_dislike_post,
|
|
||||||
receive_undo_like_post,
|
|
||||||
receive_undo_remove_post,
|
|
||||||
},
|
|
||||||
receive_unhandled_activity,
|
|
||||||
verify_activity_domains_valid,
|
|
||||||
FindResults,
|
|
||||||
},
|
|
||||||
check_is_apub_id_valid,
|
|
||||||
extensions::signatures::verify_signature,
|
|
||||||
fetcher::get_or_fetch_and_upsert_actor,
|
|
||||||
inbox::{get_activity_id, is_activity_already_known},
|
|
||||||
insert_activity,
|
insert_activity,
|
||||||
ActorType,
|
|
||||||
};
|
|
||||||
use activitystreams::{
|
|
||||||
activity::{ActorAndObject, Announce, Create, Delete, Dislike, Like, Remove, Undo, Update},
|
|
||||||
base::AnyBase,
|
|
||||||
prelude::*,
|
|
||||||
};
|
};
|
||||||
|
use activitystreams::{activity::ActorAndObject, prelude::*};
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::Context;
|
||||||
use lemmy_db::{site::Site, Crud};
|
use lemmy_db::{community::Community, user::User_, DbPool};
|
||||||
use lemmy_structs::blocking;
|
use lemmy_structs::blocking;
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
use log::debug;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
@ -77,7 +35,7 @@ pub enum ValidTypes {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject,
|
// TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject,
|
||||||
// but it might still work due to the anybase conversion
|
// but it still works due to the anybase conversion
|
||||||
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
||||||
|
|
||||||
/// Handler for all incoming requests to shared inbox.
|
/// Handler for all incoming requests to shared inbox.
|
||||||
|
@ -87,332 +45,141 @@ pub async fn shared_inbox(
|
||||||
context: web::Data<LemmyContext>,
|
context: web::Data<LemmyContext>,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let activity = input.into_inner();
|
let activity = input.into_inner();
|
||||||
|
// First of all check the http signature
|
||||||
let actor_id = activity
|
|
||||||
.actor()?
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
debug!(
|
|
||||||
"Shared inbox received activity {:?} from {}",
|
|
||||||
&activity.id_unchecked(),
|
|
||||||
&actor_id
|
|
||||||
);
|
|
||||||
|
|
||||||
check_is_apub_id_valid(&actor_id)?;
|
|
||||||
|
|
||||||
let request_counter = &mut 0;
|
let request_counter = &mut 0;
|
||||||
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
|
let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
|
||||||
verify_signature(&request, actor.as_ref())?;
|
|
||||||
|
|
||||||
|
// Do nothing if we received the same activity before
|
||||||
|
let actor_id = actor.actor_id()?;
|
||||||
let activity_id = get_activity_id(&activity, &actor_id)?;
|
let activity_id = get_activity_id(&activity, &actor_id)?;
|
||||||
if is_activity_already_known(context.pool(), &activity_id).await? {
|
if is_activity_already_known(context.pool(), &activity_id).await? {
|
||||||
return Ok(HttpResponse::Ok().finish());
|
return Ok(HttpResponse::Ok().finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
let any_base = activity.clone().into_any_base()?;
|
// Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
|
||||||
let kind = activity.kind().context(location_info!())?;
|
// if we receive the same activity twice in very quick succession.
|
||||||
let res = match kind {
|
|
||||||
ValidTypes::Announce => {
|
|
||||||
receive_announce(&context, any_base, actor.as_ref(), request_counter).await
|
|
||||||
}
|
|
||||||
ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
|
|
||||||
ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
|
|
||||||
ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
|
|
||||||
ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
|
|
||||||
ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
|
|
||||||
ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
|
|
||||||
ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
|
|
||||||
};
|
|
||||||
|
|
||||||
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Takes an announce and passes the inner activity to the appropriate handler.
|
let activity_any_base = activity.clone().into_any_base()?;
|
||||||
async fn receive_announce(
|
let mut res: Option<HttpResponse> = None;
|
||||||
context: &LemmyContext,
|
let to_and_cc = get_activity_to_and_cc(&activity)?;
|
||||||
activity: AnyBase,
|
// If to_and_cc contains a local community, pass to receive_community_message()
|
||||||
actor: &dyn ActorType,
|
// Handle community first, so in case the sender is banned by the community, it will error out.
|
||||||
request_counter: &mut i32,
|
// If we handled the user receive first, the activity would be inserted to the database before the
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
// community could check for bans.
|
||||||
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
|
let community = extract_local_community_from_destinations(&to_and_cc, context.pool()).await?;
|
||||||
verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
|
if let Some(community) = community {
|
||||||
|
let community_activity = CommunityAcceptedActivities::from_any_base(activity_any_base.clone())?
|
||||||
let kind = announce.object().as_single_kind_str();
|
|
||||||
let object = announce
|
|
||||||
.object()
|
|
||||||
.to_owned()
|
|
||||||
.one()
|
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
res = Some(
|
||||||
let inner_id = object.id().context(location_info!())?.to_owned();
|
community_receive_message(
|
||||||
check_is_apub_id_valid(&inner_id)?;
|
community_activity,
|
||||||
if is_activity_already_known(context.pool(), &inner_id).await? {
|
community,
|
||||||
return Ok(HttpResponse::Ok().finish());
|
actor.as_ref(),
|
||||||
|
&context,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
match kind {
|
// If to_and_cc contains a local user, pass to receive_user_message()
|
||||||
Some("Create") => receive_create(context, object, inner_id, request_counter).await,
|
if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
|
||||||
Some("Update") => receive_update(context, object, inner_id, request_counter).await,
|
let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
|
||||||
Some("Like") => receive_like(context, object, inner_id, request_counter).await,
|
.context(location_info!())?;
|
||||||
Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
|
// `to_user` is only used for follow activities (which we dont receive here), so no need to pass
|
||||||
Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
|
// it in
|
||||||
Some("Remove") => receive_remove(context, object, inner_id).await,
|
user_receive_message(
|
||||||
Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
|
user_activity,
|
||||||
_ => receive_unhandled_activity(announce),
|
None,
|
||||||
|
actor.as_ref(),
|
||||||
|
&context,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If to_and_cc contains followers collection of a community, pass to receive_user_message()
|
||||||
|
if is_addressed_to_community_followers(&to_and_cc, context.pool()).await? {
|
||||||
|
let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
|
||||||
|
.context(location_info!())?;
|
||||||
|
res = Some(
|
||||||
|
user_receive_message(
|
||||||
|
user_activity,
|
||||||
|
None,
|
||||||
|
actor.as_ref(),
|
||||||
|
&context,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If none of those, throw an error
|
||||||
|
if let Some(r) = res {
|
||||||
|
Ok(r)
|
||||||
|
} else {
|
||||||
|
Ok(HttpResponse::NotImplemented().finish())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_create(
|
/// If `to_and_cc` contains the ID of a local community, return that community, otherwise return
|
||||||
context: &LemmyContext,
|
/// None.
|
||||||
activity: AnyBase,
|
///
|
||||||
expected_domain: Url,
|
/// This doesnt handle the case where an activity is addressed to multiple communities (because
|
||||||
request_counter: &mut i32,
|
/// Lemmy doesnt generate such activities).
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
async fn extract_local_community_from_destinations(
|
||||||
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
to_and_cc: &[Url],
|
||||||
verify_activity_domains_valid(&create, expected_domain, true)?;
|
pool: &DbPool,
|
||||||
|
) -> Result<Option<Community>, LemmyError> {
|
||||||
match create.object().as_single_kind_str() {
|
for url in to_and_cc {
|
||||||
Some("Page") => receive_create_post(create, context, request_counter).await,
|
let url = url.to_string();
|
||||||
Some("Note") => receive_create_comment(create, context, request_counter).await,
|
let community = blocking(&pool, move |conn| {
|
||||||
_ => receive_unhandled_activity(create),
|
Community::read_from_actor_id(&conn, &url)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
if let Ok(c) = community {
|
||||||
|
if c.local {
|
||||||
|
return Ok(Some(c));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_update(
|
/// Returns true if `to_and_cc` contains at least one local user.
|
||||||
context: &LemmyContext,
|
async fn is_addressed_to_local_user(to_and_cc: &[Url], pool: &DbPool) -> Result<bool, LemmyError> {
|
||||||
activity: AnyBase,
|
for url in to_and_cc {
|
||||||
expected_domain: Url,
|
let url = url.to_string();
|
||||||
request_counter: &mut i32,
|
let user = blocking(&pool, move |conn| User_::read_from_actor_id(&conn, &url)).await?;
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
if let Ok(u) = user {
|
||||||
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
if u.local {
|
||||||
verify_activity_domains_valid(&update, expected_domain, true)?;
|
return Ok(true);
|
||||||
|
|
||||||
match update.object().as_single_kind_str() {
|
|
||||||
Some("Page") => receive_update_post(update, context, request_counter).await,
|
|
||||||
Some("Note") => receive_update_comment(update, context, request_counter).await,
|
|
||||||
_ => receive_unhandled_activity(update),
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_like(
|
/// Returns true if `to_and_cc` contains at least one followers collection of a remote community
|
||||||
context: &LemmyContext,
|
/// (like `https://example.com/c/main/followers`)
|
||||||
activity: AnyBase,
|
async fn is_addressed_to_community_followers(
|
||||||
expected_domain: Url,
|
to_and_cc: &[Url],
|
||||||
request_counter: &mut i32,
|
pool: &DbPool,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<bool, LemmyError> {
|
||||||
let like = Like::from_any_base(activity)?.context(location_info!())?;
|
for url in to_and_cc {
|
||||||
verify_activity_domains_valid(&like, expected_domain, false)?;
|
let url = url.to_string();
|
||||||
|
// TODO: extremely hacky, we should just store the followers url for each community in the db
|
||||||
match like.object().as_single_kind_str() {
|
if url.ends_with("/followers") {
|
||||||
Some("Page") => receive_like_post(like, context, request_counter).await,
|
let community_url = url.replace("/followers", "");
|
||||||
Some("Note") => receive_like_comment(like, context, request_counter).await,
|
let community = blocking(&pool, move |conn| {
|
||||||
_ => receive_unhandled_activity(like),
|
Community::read_from_actor_id(&conn, &community_url)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_dislike(
|
|
||||||
context: &LemmyContext,
|
|
||||||
activity: AnyBase,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let enable_downvotes = blocking(context.pool(), move |conn| {
|
|
||||||
Site::read(conn, 1).map(|s| s.enable_downvotes)
|
|
||||||
})
|
})
|
||||||
.await??;
|
.await??;
|
||||||
if !enable_downvotes {
|
if !community.local {
|
||||||
return Ok(HttpResponse::Ok().finish());
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&dislike, expected_domain, false)?;
|
|
||||||
|
|
||||||
match dislike.object().as_single_kind_str() {
|
|
||||||
Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
|
|
||||||
Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
|
|
||||||
_ => receive_unhandled_activity(dislike),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn receive_delete(
|
|
||||||
context: &LemmyContext,
|
|
||||||
activity: AnyBase,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
|
||||||
|
|
||||||
let object = delete
|
|
||||||
.object()
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
|
|
||||||
match find_by_id(context, object).await {
|
|
||||||
Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
|
|
||||||
Ok(FindResults::Comment(c)) => {
|
|
||||||
receive_delete_comment(context, delete, c, request_counter).await
|
|
||||||
}
|
|
||||||
Ok(FindResults::Community(c)) => {
|
|
||||||
receive_delete_community(context, delete, c, request_counter).await
|
|
||||||
}
|
|
||||||
// if we dont have the object, no need to do anything
|
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_remove(
|
|
||||||
context: &LemmyContext,
|
|
||||||
activity: AnyBase,
|
|
||||||
expected_domain: Url,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&remove, expected_domain, false)?;
|
|
||||||
|
|
||||||
let cc = remove
|
|
||||||
.cc()
|
|
||||||
.map(|c| c.as_many())
|
|
||||||
.flatten()
|
|
||||||
.context(location_info!())?;
|
|
||||||
let community_id = cc
|
|
||||||
.first()
|
|
||||||
.map(|c| c.as_xsd_any_uri())
|
|
||||||
.flatten()
|
|
||||||
.context(location_info!())?;
|
|
||||||
|
|
||||||
let object = remove
|
|
||||||
.object()
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
|
|
||||||
// Ensure that remove activity comes from the same domain as the community
|
|
||||||
remove.id(community_id.domain().context(location_info!())?)?;
|
|
||||||
|
|
||||||
match find_by_id(context, object).await {
|
|
||||||
Ok(FindResults::Post(p)) => receive_remove_post(context, remove, p).await,
|
|
||||||
Ok(FindResults::Comment(c)) => receive_remove_comment(context, remove, c).await,
|
|
||||||
Ok(FindResults::Community(c)) => receive_remove_community(context, remove, c).await,
|
|
||||||
// if we dont have the object, no need to do anything
|
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_undo(
|
|
||||||
context: &LemmyContext,
|
|
||||||
activity: AnyBase,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
|
|
||||||
|
|
||||||
match undo.object().as_single_kind_str() {
|
|
||||||
Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
|
|
||||||
Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
|
|
||||||
Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
|
|
||||||
Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
|
|
||||||
_ => receive_unhandled_activity(undo),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_undo_delete(
|
|
||||||
context: &LemmyContext,
|
|
||||||
undo: Undo,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
|
||||||
.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
|
||||||
|
|
||||||
let object = delete
|
|
||||||
.object()
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
match find_by_id(context, object).await {
|
|
||||||
Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
|
|
||||||
Ok(FindResults::Comment(c)) => {
|
|
||||||
receive_undo_delete_comment(context, undo, c, request_counter).await
|
|
||||||
}
|
|
||||||
Ok(FindResults::Community(c)) => {
|
|
||||||
receive_undo_delete_community(context, undo, c, request_counter).await
|
|
||||||
}
|
|
||||||
// if we dont have the object, no need to do anything
|
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_undo_remove(
|
|
||||||
context: &LemmyContext,
|
|
||||||
undo: Undo,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
|
||||||
.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&remove, expected_domain, false)?;
|
|
||||||
|
|
||||||
let object = remove
|
|
||||||
.object()
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
match find_by_id(context, object).await {
|
|
||||||
Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
|
|
||||||
Ok(FindResults::Comment(c)) => {
|
|
||||||
receive_undo_remove_comment(context, undo, c, request_counter).await
|
|
||||||
}
|
|
||||||
Ok(FindResults::Community(c)) => {
|
|
||||||
receive_undo_remove_community(context, undo, c, request_counter).await
|
|
||||||
}
|
|
||||||
// if we dont have the object, no need to do anything
|
|
||||||
Err(_) => Ok(HttpResponse::Ok().finish()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_undo_like(
|
|
||||||
context: &LemmyContext,
|
|
||||||
undo: Undo,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
|
||||||
.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&like, expected_domain, false)?;
|
|
||||||
|
|
||||||
let type_ = like
|
|
||||||
.object()
|
|
||||||
.as_single_kind_str()
|
|
||||||
.context(location_info!())?;
|
|
||||||
match type_ {
|
|
||||||
"Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
|
|
||||||
"Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
|
|
||||||
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_undo_dislike(
|
|
||||||
context: &LemmyContext,
|
|
||||||
undo: Undo,
|
|
||||||
expected_domain: Url,
|
|
||||||
request_counter: &mut i32,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
|
|
||||||
.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&dislike, expected_domain, false)?;
|
|
||||||
|
|
||||||
let type_ = dislike
|
|
||||||
.object()
|
|
||||||
.as_single_kind_str()
|
|
||||||
.context(location_info!())?;
|
|
||||||
match type_ {
|
|
||||||
"Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
|
|
||||||
"Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
|
|
||||||
d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,114 +1,164 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
activities::receive::verify_activity_domains_valid,
|
activities::receive::{
|
||||||
|
community::{
|
||||||
|
receive_delete_community,
|
||||||
|
receive_remove_community,
|
||||||
|
receive_undo_delete_community,
|
||||||
|
receive_undo_remove_community,
|
||||||
|
},
|
||||||
|
private_message::{
|
||||||
|
receive_create_private_message,
|
||||||
|
receive_delete_private_message,
|
||||||
|
receive_undo_delete_private_message,
|
||||||
|
receive_update_private_message,
|
||||||
|
},
|
||||||
|
receive_unhandled_activity,
|
||||||
|
verify_activity_domains_valid,
|
||||||
|
},
|
||||||
check_is_apub_id_valid,
|
check_is_apub_id_valid,
|
||||||
extensions::signatures::verify_signature,
|
fetcher::get_or_fetch_and_upsert_community,
|
||||||
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
|
inbox::{
|
||||||
inbox::{get_activity_id, is_activity_already_known},
|
get_activity_id,
|
||||||
|
get_activity_to_and_cc,
|
||||||
|
inbox_verify_http_signature,
|
||||||
|
is_activity_already_known,
|
||||||
|
is_addressed_to_public,
|
||||||
|
receive_for_community::{
|
||||||
|
receive_create_for_community,
|
||||||
|
receive_delete_for_community,
|
||||||
|
receive_dislike_for_community,
|
||||||
|
receive_like_for_community,
|
||||||
|
receive_remove_for_community,
|
||||||
|
receive_undo_for_community,
|
||||||
|
receive_update_for_community,
|
||||||
|
},
|
||||||
|
},
|
||||||
insert_activity,
|
insert_activity,
|
||||||
ActorType,
|
ActorType,
|
||||||
FromApub,
|
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update},
|
activity::{Accept, ActorAndObject, Announce, Delete, Follow, Undo},
|
||||||
base::AnyBase,
|
base::AnyBase,
|
||||||
object::Note,
|
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
|
use diesel::NotFound;
|
||||||
use lemmy_db::{
|
use lemmy_db::{
|
||||||
community::{CommunityFollower, CommunityFollowerForm},
|
community::{Community, CommunityFollower, CommunityFollowerForm},
|
||||||
private_message::{PrivateMessage, PrivateMessageForm},
|
private_message::PrivateMessage,
|
||||||
private_message_view::PrivateMessageView,
|
|
||||||
user::User_,
|
user::User_,
|
||||||
Crud,
|
|
||||||
Followable,
|
Followable,
|
||||||
};
|
};
|
||||||
use lemmy_structs::{blocking, user::PrivateMessageResponse};
|
use lemmy_structs::blocking;
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
|
use lemmy_websocket::LemmyContext;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
/// Allowed activities for user inbox.
|
/// Allowed activities for user inbox.
|
||||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub enum ValidTypes {
|
pub enum UserValidTypes {
|
||||||
Accept,
|
Accept, // community accepted our follow request
|
||||||
Create,
|
Create, // create private message
|
||||||
Update,
|
Update, // edit private message
|
||||||
Delete,
|
Delete, // private message or community deleted by creator
|
||||||
Undo,
|
Undo, // private message or community restored
|
||||||
|
Remove, // community removed by admin
|
||||||
|
Announce, // post, comment or vote in community
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
|
||||||
|
|
||||||
/// Handler for all incoming activities to user inboxes.
|
/// Handler for all incoming activities to user inboxes.
|
||||||
pub async fn user_inbox(
|
pub async fn user_inbox(
|
||||||
request: HttpRequest,
|
request: HttpRequest,
|
||||||
input: web::Json<AcceptedActivities>,
|
input: web::Json<UserAcceptedActivities>,
|
||||||
path: web::Path<String>,
|
path: web::Path<String>,
|
||||||
context: web::Data<LemmyContext>,
|
context: web::Data<LemmyContext>,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let activity = input.into_inner();
|
let activity = input.into_inner();
|
||||||
|
// First of all check the http signature
|
||||||
|
let request_counter = &mut 0;
|
||||||
|
let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
|
||||||
|
|
||||||
|
// Do nothing if we received the same activity before
|
||||||
|
let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
|
||||||
|
if is_activity_already_known(context.pool(), &activity_id).await? {
|
||||||
|
return Ok(HttpResponse::Ok().finish());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the activity is actually meant for us
|
||||||
let username = path.into_inner();
|
let username = path.into_inner();
|
||||||
let user = blocking(&context.pool(), move |conn| {
|
let user = blocking(&context.pool(), move |conn| {
|
||||||
User_::read_from_name(&conn, &username)
|
User_::read_from_name(&conn, &username)
|
||||||
})
|
})
|
||||||
.await??;
|
.await??;
|
||||||
|
let to_and_cc = get_activity_to_and_cc(&activity)?;
|
||||||
let to = activity
|
if !to_and_cc.contains(&&user.actor_id()?) {
|
||||||
.to()
|
|
||||||
.context(location_info!())?
|
|
||||||
.to_owned()
|
|
||||||
.single_xsd_any_uri();
|
|
||||||
if Some(user.actor_id()?) != to {
|
|
||||||
return Err(anyhow!("Activity delivered to wrong user").into());
|
return Err(anyhow!("Activity delivered to wrong user").into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let actor_uri = activity
|
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
||||||
.actor()?
|
|
||||||
.as_single_xsd_any_uri()
|
|
||||||
.context(location_info!())?;
|
|
||||||
debug!(
|
debug!(
|
||||||
"User {} inbox received activity {:?} from {}",
|
"User {} received activity {:?} from {}",
|
||||||
user.name,
|
user.name,
|
||||||
&activity.id_unchecked(),
|
&activity.id_unchecked(),
|
||||||
&actor_uri
|
&actor.actor_id_str()
|
||||||
);
|
);
|
||||||
|
|
||||||
check_is_apub_id_valid(actor_uri)?;
|
user_receive_message(
|
||||||
|
activity.clone(),
|
||||||
|
Some(user.clone()),
|
||||||
|
actor.as_ref(),
|
||||||
|
&context,
|
||||||
|
request_counter,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
let request_counter = &mut 0;
|
/// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
|
||||||
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
|
pub(crate) async fn user_receive_message(
|
||||||
verify_signature(&request, actor.as_ref())?;
|
activity: UserAcceptedActivities,
|
||||||
|
to_user: Option<User_>,
|
||||||
|
actor: &dyn ActorType,
|
||||||
|
context: &LemmyContext,
|
||||||
|
request_counter: &mut i32,
|
||||||
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
|
// TODO: must be addressed to one or more local users, or to followers of a remote community
|
||||||
|
|
||||||
let activity_id = get_activity_id(&activity, actor_uri)?;
|
// TODO: if it is addressed to community followers, check that at least one local user is following it
|
||||||
if is_activity_already_known(context.pool(), &activity_id).await? {
|
|
||||||
return Ok(HttpResponse::Ok().finish());
|
|
||||||
}
|
|
||||||
|
|
||||||
let any_base = activity.clone().into_any_base()?;
|
let any_base = activity.clone().into_any_base()?;
|
||||||
let kind = activity.kind().context(location_info!())?;
|
let kind = activity.kind().context(location_info!())?;
|
||||||
let res = match kind {
|
let actor_url = actor.actor_id()?;
|
||||||
ValidTypes::Accept => {
|
match kind {
|
||||||
receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
|
UserValidTypes::Accept => {
|
||||||
|
receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
|
||||||
}
|
}
|
||||||
ValidTypes::Create => {
|
UserValidTypes::Announce => {
|
||||||
receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
|
receive_announce(&context, any_base, actor, request_counter).await?
|
||||||
}
|
}
|
||||||
ValidTypes::Update => {
|
UserValidTypes::Create => {
|
||||||
receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
|
receive_create_private_message(&context, any_base, actor_url, request_counter).await?
|
||||||
}
|
}
|
||||||
ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
|
UserValidTypes::Update => {
|
||||||
ValidTypes::Undo => {
|
receive_update_private_message(&context, any_base, actor_url, request_counter).await?
|
||||||
receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
|
|
||||||
}
|
}
|
||||||
|
UserValidTypes::Delete => {
|
||||||
|
receive_delete(context, any_base, &actor_url, request_counter).await?
|
||||||
|
}
|
||||||
|
UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
|
||||||
|
UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
|
// TODO: would be logical to move websocket notification code here
|
||||||
res
|
|
||||||
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle accepted follows.
|
/// Handle accepted follows.
|
||||||
|
@ -118,15 +168,15 @@ async fn receive_accept(
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
user: User_,
|
user: User_,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let accept = Accept::from_any_base(activity)?.context(location_info!())?;
|
let accept = Accept::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
|
verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?;
|
||||||
|
|
||||||
// TODO: we should check that we actually sent this activity, because the remote instance
|
// TODO: we should check that we actually sent this activity, because the remote instance
|
||||||
// could just put a fake Follow
|
// could just put a fake Follow
|
||||||
let object = accept.object().to_owned().one().context(location_info!())?;
|
let object = accept.object().to_owned().one().context(location_info!())?;
|
||||||
let follow = Follow::from_any_base(object)?.context(location_info!())?;
|
let follow = Follow::from_any_base(object)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
|
verify_activity_domains_valid(&follow, &user.actor_id()?, false)?;
|
||||||
|
|
||||||
let community_uri = accept
|
let community_uri = accept
|
||||||
.actor()?
|
.actor()?
|
||||||
|
@ -149,186 +199,137 @@ async fn receive_accept(
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_create_private_message(
|
/// Takes an announce and passes the inner activity to the appropriate handler.
|
||||||
|
async fn receive_announce(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let create = Create::from_any_base(activity)?.context(location_info!())?;
|
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
|
||||||
verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
|
verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?;
|
||||||
|
is_addressed_to_public(&announce)?;
|
||||||
|
|
||||||
let note = Note::from_any_base(
|
let kind = announce.object().as_single_kind_str();
|
||||||
create
|
let inner_activity = announce
|
||||||
.object()
|
.object()
|
||||||
.as_one()
|
.to_owned()
|
||||||
.context(location_info!())?
|
.one()
|
||||||
.to_owned(),
|
|
||||||
)?
|
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
|
|
||||||
let private_message =
|
let inner_id = inner_activity.id().context(location_info!())?.to_owned();
|
||||||
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
|
check_is_apub_id_valid(&inner_id)?;
|
||||||
|
if is_activity_already_known(context.pool(), &inner_id).await? {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let inserted_private_message = blocking(&context.pool(), move |conn| {
|
dbg!(&kind);
|
||||||
PrivateMessage::create(conn, &private_message)
|
match kind {
|
||||||
})
|
Some("Create") => {
|
||||||
.await??;
|
receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
|
||||||
|
}
|
||||||
let message = blocking(&context.pool(), move |conn| {
|
Some("Update") => {
|
||||||
PrivateMessageView::read(conn, inserted_private_message.id)
|
receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
|
||||||
})
|
}
|
||||||
.await??;
|
Some("Like") => {
|
||||||
|
receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
|
||||||
let res = PrivateMessageResponse { message };
|
}
|
||||||
|
Some("Dislike") => {
|
||||||
let recipient_id = res.message.recipient_id;
|
receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
|
||||||
|
}
|
||||||
context.chat_server().do_send(SendUserRoomMessage {
|
Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
|
||||||
op: UserOperation::CreatePrivateMessage,
|
Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
|
||||||
response: res,
|
Some("Undo") => {
|
||||||
recipient_id,
|
receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
|
||||||
websocket_id: None,
|
}
|
||||||
});
|
_ => receive_unhandled_activity(inner_activity),
|
||||||
|
}
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_update_private_message(
|
async fn receive_delete(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
any_base: AnyBase,
|
||||||
actor: &dyn ActorType,
|
expected_domain: &Url,
|
||||||
request_counter: &mut i32,
|
request_counter: &mut i32,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let update = Update::from_any_base(activity)?.context(location_info!())?;
|
use CommunityOrPrivateMessage::*;
|
||||||
verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
|
|
||||||
|
|
||||||
let object = update
|
let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
|
||||||
.object()
|
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
||||||
.as_one()
|
let object_uri = delete
|
||||||
.context(location_info!())?
|
|
||||||
.to_owned();
|
|
||||||
let note = Note::from_any_base(object)?.context(location_info!())?;
|
|
||||||
|
|
||||||
let private_message_form =
|
|
||||||
PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
|
|
||||||
|
|
||||||
let private_message_ap_id = private_message_form
|
|
||||||
.ap_id
|
|
||||||
.as_ref()
|
|
||||||
.context(location_info!())?
|
|
||||||
.clone();
|
|
||||||
let private_message = blocking(&context.pool(), move |conn| {
|
|
||||||
PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
let private_message_id = private_message.id;
|
|
||||||
blocking(&context.pool(), move |conn| {
|
|
||||||
PrivateMessage::update(conn, private_message_id, &private_message_form)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
let private_message_id = private_message.id;
|
|
||||||
let message = blocking(&context.pool(), move |conn| {
|
|
||||||
PrivateMessageView::read(conn, private_message_id)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
let res = PrivateMessageResponse { message };
|
|
||||||
|
|
||||||
let recipient_id = res.message.recipient_id;
|
|
||||||
|
|
||||||
context.chat_server().do_send(SendUserRoomMessage {
|
|
||||||
op: UserOperation::EditPrivateMessage,
|
|
||||||
response: res,
|
|
||||||
recipient_id,
|
|
||||||
websocket_id: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn receive_delete_private_message(
|
|
||||||
context: &LemmyContext,
|
|
||||||
activity: AnyBase,
|
|
||||||
actor: &dyn ActorType,
|
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
|
||||||
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
|
|
||||||
verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
|
|
||||||
|
|
||||||
let private_message_id = delete
|
|
||||||
.object()
|
.object()
|
||||||
.to_owned()
|
.to_owned()
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let private_message = blocking(context.pool(), move |conn| {
|
|
||||||
PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
let deleted_private_message = blocking(context.pool(), move |conn| {
|
|
||||||
PrivateMessage::update_deleted(conn, private_message.id, true)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
let message = blocking(&context.pool(), move |conn| {
|
match find_community_or_private_message_by_id(context, object_uri).await? {
|
||||||
PrivateMessageView::read(&conn, deleted_private_message.id)
|
Community(c) => receive_delete_community(context, c).await,
|
||||||
})
|
PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
|
||||||
.await??;
|
}
|
||||||
|
|
||||||
let res = PrivateMessageResponse { message };
|
|
||||||
let recipient_id = res.message.recipient_id;
|
|
||||||
context.chat_server().do_send(SendUserRoomMessage {
|
|
||||||
op: UserOperation::EditPrivateMessage,
|
|
||||||
response: res,
|
|
||||||
recipient_id,
|
|
||||||
websocket_id: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_undo_delete_private_message(
|
async fn receive_undo(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
any_base: AnyBase,
|
||||||
actor: &dyn ActorType,
|
expected_domain: &Url,
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
request_counter: &mut i32,
|
||||||
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
|
) -> Result<(), LemmyError> {
|
||||||
verify_activity_domains_valid(&undo, actor.actor_id()?, true)?;
|
use CommunityOrPrivateMessage::*;
|
||||||
let object = undo.object().to_owned().one().context(location_info!())?;
|
let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
|
||||||
let delete = Delete::from_any_base(object)?.context(location_info!())?;
|
verify_activity_domains_valid(&undo, expected_domain, true)?;
|
||||||
verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
|
|
||||||
|
|
||||||
let private_message_id = delete
|
let inner_activity = undo.object().to_owned().one().context(location_info!())?;
|
||||||
|
let kind = inner_activity.kind_str();
|
||||||
|
match kind {
|
||||||
|
Some("Delete") => {
|
||||||
|
let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
|
||||||
|
verify_activity_domains_valid(&delete, expected_domain, true)?;
|
||||||
|
let object_uri = delete
|
||||||
.object()
|
.object()
|
||||||
.to_owned()
|
.to_owned()
|
||||||
.single_xsd_any_uri()
|
.single_xsd_any_uri()
|
||||||
.context(location_info!())?;
|
.context(location_info!())?;
|
||||||
let private_message = blocking(context.pool(), move |conn| {
|
match find_community_or_private_message_by_id(context, object_uri).await? {
|
||||||
PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
|
Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
|
||||||
})
|
PrivateMessage(p) => {
|
||||||
.await??;
|
receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
|
||||||
let deleted_private_message = blocking(context.pool(), move |conn| {
|
.await
|
||||||
PrivateMessage::update_deleted(conn, private_message.id, false)
|
}
|
||||||
})
|
}
|
||||||
.await??;
|
}
|
||||||
|
Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
|
||||||
let message = blocking(&context.pool(), move |conn| {
|
_ => receive_unhandled_activity(undo),
|
||||||
PrivateMessageView::read(&conn, deleted_private_message.id)
|
}
|
||||||
})
|
}
|
||||||
.await??;
|
enum CommunityOrPrivateMessage {
|
||||||
|
Community(Community),
|
||||||
let res = PrivateMessageResponse { message };
|
PrivateMessage(PrivateMessage),
|
||||||
let recipient_id = res.message.recipient_id;
|
}
|
||||||
context.chat_server().do_send(SendUserRoomMessage {
|
|
||||||
op: UserOperation::EditPrivateMessage,
|
async fn find_community_or_private_message_by_id(
|
||||||
response: res,
|
context: &LemmyContext,
|
||||||
recipient_id,
|
apub_id: Url,
|
||||||
websocket_id: None,
|
) -> Result<CommunityOrPrivateMessage, LemmyError> {
|
||||||
});
|
let ap_id = apub_id.to_string();
|
||||||
|
let community = blocking(context.pool(), move |conn| {
|
||||||
Ok(HttpResponse::Ok().finish())
|
Community::read_from_actor_id(conn, &ap_id)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
if let Ok(c) = community {
|
||||||
|
return Ok(CommunityOrPrivateMessage::Community(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
let ap_id = apub_id.to_string();
|
||||||
|
let private_message = blocking(context.pool(), move |conn| {
|
||||||
|
PrivateMessage::read_from_apub_id(conn, &ap_id)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
if let Ok(p) = private_message {
|
||||||
|
return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(NotFound.into());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue