Rewrite user_inbox and community_inbox in the same way as shared_inbox
This commit is contained in:
parent
d845ea66fe
commit
d0fc8f38e4
6 changed files with 119 additions and 135 deletions
|
@ -507,36 +507,36 @@ joinable!(user_mention -> comment (comment_id));
|
|||
joinable!(user_mention -> user_ (recipient_id));
|
||||
|
||||
allow_tables_to_appear_in_same_query!(
|
||||
activity,
|
||||
category,
|
||||
comment,
|
||||
comment_aggregates_fast,
|
||||
comment_like,
|
||||
comment_saved,
|
||||
community,
|
||||
community_aggregates_fast,
|
||||
community_follower,
|
||||
community_moderator,
|
||||
community_user_ban,
|
||||
mod_add,
|
||||
mod_add_community,
|
||||
mod_ban,
|
||||
mod_ban_from_community,
|
||||
mod_lock_post,
|
||||
mod_remove_comment,
|
||||
mod_remove_community,
|
||||
mod_remove_post,
|
||||
mod_sticky_post,
|
||||
password_reset_request,
|
||||
post,
|
||||
post_aggregates_fast,
|
||||
post_like,
|
||||
post_read,
|
||||
post_saved,
|
||||
private_message,
|
||||
site,
|
||||
user_,
|
||||
user_ban,
|
||||
user_fast,
|
||||
user_mention,
|
||||
activity,
|
||||
category,
|
||||
comment,
|
||||
comment_aggregates_fast,
|
||||
comment_like,
|
||||
comment_saved,
|
||||
community,
|
||||
community_aggregates_fast,
|
||||
community_follower,
|
||||
community_moderator,
|
||||
community_user_ban,
|
||||
mod_add,
|
||||
mod_add_community,
|
||||
mod_ban,
|
||||
mod_ban_from_community,
|
||||
mod_lock_post,
|
||||
mod_remove_comment,
|
||||
mod_remove_community,
|
||||
mod_remove_post,
|
||||
mod_sticky_post,
|
||||
password_reset_request,
|
||||
post,
|
||||
post_aggregates_fast,
|
||||
post_like,
|
||||
post_read,
|
||||
post_saved,
|
||||
private_message,
|
||||
site,
|
||||
user_,
|
||||
user_ban,
|
||||
user_fast,
|
||||
user_mention,
|
||||
);
|
||||
|
|
|
@ -39,7 +39,6 @@ pub async fn receive_create(
|
|||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let create = Create::from_any_base(activity)?.unwrap();
|
||||
dbg!(create.object().as_single_kind_str());
|
||||
match create.object().as_single_kind_str() {
|
||||
Some("Page") => receive_create_post(create, client, pool, chat_server).await,
|
||||
Some("Note") => receive_create_comment(create, client, pool, chat_server).await,
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use crate::{
|
||||
apub::{
|
||||
check_is_apub_id_valid,
|
||||
extensions::signatures::verify,
|
||||
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
|
||||
fetcher::get_or_fetch_and_upsert_user,
|
||||
insert_activity,
|
||||
ActorType,
|
||||
},
|
||||
|
@ -10,7 +11,8 @@ use crate::{
|
|||
LemmyError,
|
||||
};
|
||||
use activitystreams::{
|
||||
activity::{Follow, Undo},
|
||||
activity::{ActorAndObject, Follow, Undo},
|
||||
base::AnyBase,
|
||||
prelude::*,
|
||||
};
|
||||
use actix_web::{client::Client, web, HttpRequest, HttpResponse};
|
||||
|
@ -21,37 +23,28 @@ use lemmy_db::{
|
|||
Followable,
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[serde(untagged)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub enum CommunityAcceptedObjects {
|
||||
Follow(Follow),
|
||||
Undo(Undo),
|
||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub enum ValidTypes {
|
||||
Follow,
|
||||
Undo,
|
||||
}
|
||||
|
||||
impl CommunityAcceptedObjects {
|
||||
fn follow(&self) -> Result<Follow, LemmyError> {
|
||||
match self {
|
||||
CommunityAcceptedObjects::Follow(f) => Ok(f.to_owned()),
|
||||
CommunityAcceptedObjects::Undo(u) => {
|
||||
Ok(Follow::from_any_base(u.object().as_one().unwrap().to_owned())?.unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
||||
|
||||
/// Handler for all incoming activities to community inboxes.
|
||||
pub async fn community_inbox(
|
||||
request: HttpRequest,
|
||||
input: web::Json<CommunityAcceptedObjects>,
|
||||
input: web::Json<AcceptedActivities>,
|
||||
path: web::Path<String>,
|
||||
db: DbPoolParam,
|
||||
client: web::Data<Client>,
|
||||
_chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let input = input.into_inner();
|
||||
let activity = input.into_inner();
|
||||
|
||||
let path = path.into_inner();
|
||||
let community = blocking(&db, move |conn| Community::read_from_name(&conn, &path)).await??;
|
||||
|
@ -67,34 +60,35 @@ pub async fn community_inbox(
|
|||
}
|
||||
debug!(
|
||||
"Community {} received activity {:?}",
|
||||
&community.name, &input
|
||||
&community.name, &activity
|
||||
);
|
||||
let follow = input.follow()?;
|
||||
let user_uri = follow.actor()?.as_single_xsd_any_uri().unwrap();
|
||||
let community_uri = follow.object().as_single_xsd_any_uri().unwrap();
|
||||
let user_uri = activity.actor()?.as_single_xsd_any_uri().unwrap();
|
||||
check_is_apub_id_valid(user_uri)?;
|
||||
|
||||
let user = get_or_fetch_and_upsert_user(&user_uri, &client, &db).await?;
|
||||
let community = get_or_fetch_and_upsert_community(community_uri, &client, &db).await?;
|
||||
|
||||
verify(&request, &user)?;
|
||||
|
||||
match input {
|
||||
CommunityAcceptedObjects::Follow(f) => handle_follow(f, user, community, &client, db).await,
|
||||
CommunityAcceptedObjects::Undo(u) => handle_undo_follow(u, user, community, db).await,
|
||||
insert_activity(user.id, activity.clone(), false, &db).await?;
|
||||
|
||||
let any_base = activity.clone().into_any_base()?;
|
||||
let kind = activity.kind().unwrap();
|
||||
match kind {
|
||||
ValidTypes::Follow => handle_follow(any_base, user, community, &client, db).await,
|
||||
ValidTypes::Undo => handle_undo_follow(any_base, user, community, db).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a follow request from a remote user, adding it to the local database and returning an
|
||||
/// Accept activity.
|
||||
async fn handle_follow(
|
||||
follow: Follow,
|
||||
activity: AnyBase,
|
||||
user: User_,
|
||||
community: Community,
|
||||
client: &Client,
|
||||
db: DbPoolParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
insert_activity(user.id, follow.clone(), false, &db).await?;
|
||||
|
||||
let follow = Follow::from_any_base(activity)?.unwrap();
|
||||
let community_follower_form = CommunityFollowerForm {
|
||||
community_id: community.id,
|
||||
user_id: user.id,
|
||||
|
@ -112,12 +106,12 @@ async fn handle_follow(
|
|||
}
|
||||
|
||||
async fn handle_undo_follow(
|
||||
undo: Undo,
|
||||
activity: AnyBase,
|
||||
user: User_,
|
||||
community: Community,
|
||||
db: DbPoolParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
insert_activity(user.id, undo, false, &db).await?;
|
||||
let _undo = Undo::from_any_base(activity)?.unwrap();
|
||||
|
||||
let community_follower_form = CommunityFollowerForm {
|
||||
community_id: community.id,
|
||||
|
|
|
@ -33,11 +33,11 @@ use activitystreams::{
|
|||
use actix_web::{client::Client, web, HttpRequest, HttpResponse};
|
||||
use lemmy_db::user::User_;
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub enum ValidTypes {
|
||||
Create,
|
||||
|
@ -81,7 +81,6 @@ pub async fn shared_inbox(
|
|||
|
||||
let any_base = activity.clone().into_any_base()?;
|
||||
let kind = activity.kind().unwrap();
|
||||
dbg!(kind);
|
||||
match kind {
|
||||
ValidTypes::Announce => receive_announce(any_base, &client, &pool, chat_server).await,
|
||||
ValidTypes::Create => receive_create(any_base, &client, &pool, chat_server).await,
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use crate::{
|
||||
api::user::PrivateMessageResponse,
|
||||
apub::{
|
||||
check_is_apub_id_valid,
|
||||
extensions::signatures::verify,
|
||||
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
|
||||
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
|
||||
insert_activity,
|
||||
FromApub,
|
||||
},
|
||||
|
@ -13,7 +14,8 @@ use crate::{
|
|||
LemmyError,
|
||||
};
|
||||
use activitystreams::{
|
||||
activity::{Accept, Create, Delete, Undo, Update},
|
||||
activity::{Accept, ActorAndObject, Create, Delete, Undo, Update},
|
||||
base::AnyBase,
|
||||
object::Note,
|
||||
prelude::*,
|
||||
};
|
||||
|
@ -28,68 +30,76 @@ use lemmy_db::{
|
|||
Followable,
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[serde(untagged)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub enum UserAcceptedObjects {
|
||||
Accept(Box<Accept>),
|
||||
Create(Box<Create>),
|
||||
Update(Box<Update>),
|
||||
Delete(Box<Delete>),
|
||||
Undo(Box<Undo>),
|
||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub enum ValidTypes {
|
||||
Accept,
|
||||
Create,
|
||||
Update,
|
||||
Delete,
|
||||
Undo,
|
||||
}
|
||||
|
||||
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
|
||||
|
||||
/// Handler for all incoming activities to user inboxes.
|
||||
pub async fn user_inbox(
|
||||
request: HttpRequest,
|
||||
input: web::Json<UserAcceptedObjects>,
|
||||
input: web::Json<AcceptedActivities>,
|
||||
path: web::Path<String>,
|
||||
client: web::Data<Client>,
|
||||
db: DbPoolParam,
|
||||
pool: DbPoolParam,
|
||||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
// TODO: would be nice if we could do the signature check here, but we cant access the actor property
|
||||
let input = input.into_inner();
|
||||
let activity = input.into_inner();
|
||||
let username = path.into_inner();
|
||||
debug!("User {} received activity: {:?}", &username, &input);
|
||||
debug!("User {} received activity: {:?}", &username, &activity);
|
||||
|
||||
match input {
|
||||
UserAcceptedObjects::Accept(a) => receive_accept(*a, &request, &username, &client, &db).await,
|
||||
UserAcceptedObjects::Create(c) => {
|
||||
receive_create_private_message(*c, &request, &client, &db, chat_server).await
|
||||
let actor_uri = activity.actor()?.as_single_xsd_any_uri().unwrap();
|
||||
|
||||
check_is_apub_id_valid(actor_uri)?;
|
||||
|
||||
let actor = get_or_fetch_and_upsert_actor(actor_uri, &client, &pool).await?;
|
||||
verify(&request, actor.as_ref())?;
|
||||
|
||||
insert_activity(actor.user_id(), activity.clone(), false, &pool).await?;
|
||||
|
||||
let any_base = activity.clone().into_any_base()?;
|
||||
let kind = activity.kind().unwrap();
|
||||
match kind {
|
||||
ValidTypes::Accept => receive_accept(any_base, username, &client, &pool).await,
|
||||
ValidTypes::Create => {
|
||||
receive_create_private_message(any_base, &client, &pool, chat_server).await
|
||||
}
|
||||
UserAcceptedObjects::Update(u) => {
|
||||
receive_update_private_message(*u, &request, &client, &db, chat_server).await
|
||||
ValidTypes::Update => {
|
||||
receive_update_private_message(any_base, &client, &pool, chat_server).await
|
||||
}
|
||||
UserAcceptedObjects::Delete(d) => {
|
||||
receive_delete_private_message(*d, &request, &client, &db, chat_server).await
|
||||
ValidTypes::Delete => {
|
||||
receive_delete_private_message(any_base, &client, &pool, chat_server).await
|
||||
}
|
||||
UserAcceptedObjects::Undo(u) => {
|
||||
receive_undo_delete_private_message(*u, &request, &client, &db, chat_server).await
|
||||
ValidTypes::Undo => {
|
||||
receive_undo_delete_private_message(any_base, &client, &pool, chat_server).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle accepted follows.
|
||||
async fn receive_accept(
|
||||
accept: Accept,
|
||||
request: &HttpRequest,
|
||||
username: &str,
|
||||
activity: AnyBase,
|
||||
username: String,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let accept = Accept::from_any_base(activity)?.unwrap();
|
||||
let community_uri = accept.actor()?.to_owned().single_xsd_any_uri().unwrap();
|
||||
|
||||
let community = get_or_fetch_and_upsert_community(&community_uri, client, pool).await?;
|
||||
verify(request, &community)?;
|
||||
|
||||
let username = username.to_owned();
|
||||
let user = blocking(pool, move |conn| User_::read_from_name(conn, &username)).await??;
|
||||
|
||||
insert_activity(community.creator_id, accept, false, pool).await?;
|
||||
|
||||
// Now you need to add this to the community follower
|
||||
let community_follower_form = CommunityFollowerForm {
|
||||
community_id: community.id,
|
||||
|
@ -107,20 +117,14 @@ async fn receive_accept(
|
|||
}
|
||||
|
||||
async fn receive_create_private_message(
|
||||
create: Create,
|
||||
request: &HttpRequest,
|
||||
activity: AnyBase,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let user_uri = &create.actor()?.to_owned().single_xsd_any_uri().unwrap();
|
||||
let create = Create::from_any_base(activity)?.unwrap();
|
||||
let note = Note::from_any_base(create.object().as_one().unwrap().to_owned())?.unwrap();
|
||||
|
||||
let user = get_or_fetch_and_upsert_user(user_uri, client, pool).await?;
|
||||
verify(request, &user)?;
|
||||
|
||||
insert_activity(user.id, create, false, pool).await?;
|
||||
|
||||
let private_message = PrivateMessageForm::from_apub(¬e, client, pool).await?;
|
||||
|
||||
let inserted_private_message = blocking(pool, move |conn| {
|
||||
|
@ -148,20 +152,14 @@ async fn receive_create_private_message(
|
|||
}
|
||||
|
||||
async fn receive_update_private_message(
|
||||
update: Update,
|
||||
request: &HttpRequest,
|
||||
activity: AnyBase,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let user_uri = &update.actor()?.to_owned().single_xsd_any_uri().unwrap();
|
||||
let update = Update::from_any_base(activity)?.unwrap();
|
||||
let note = Note::from_any_base(update.object().as_one().unwrap().to_owned())?.unwrap();
|
||||
|
||||
let user = get_or_fetch_and_upsert_user(&user_uri, client, pool).await?;
|
||||
verify(request, &user)?;
|
||||
|
||||
insert_activity(user.id, update, false, pool).await?;
|
||||
|
||||
let private_message_form = PrivateMessageForm::from_apub(¬e, client, pool).await?;
|
||||
|
||||
let private_message_ap_id = private_message_form.ap_id.clone();
|
||||
|
@ -197,20 +195,14 @@ async fn receive_update_private_message(
|
|||
}
|
||||
|
||||
async fn receive_delete_private_message(
|
||||
delete: Delete,
|
||||
request: &HttpRequest,
|
||||
activity: AnyBase,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let user_uri = &delete.actor()?.to_owned().single_xsd_any_uri().unwrap();
|
||||
let delete = Delete::from_any_base(activity)?.unwrap();
|
||||
let note = Note::from_any_base(delete.object().as_one().unwrap().to_owned())?.unwrap();
|
||||
|
||||
let user = get_or_fetch_and_upsert_user(&user_uri, client, pool).await?;
|
||||
verify(request, &user)?;
|
||||
|
||||
insert_activity(user.id, delete, false, pool).await?;
|
||||
|
||||
let private_message_form = PrivateMessageForm::from_apub(¬e, client, pool).await?;
|
||||
|
||||
let private_message_ap_id = private_message_form.ap_id;
|
||||
|
@ -258,20 +250,14 @@ async fn receive_delete_private_message(
|
|||
}
|
||||
|
||||
async fn receive_undo_delete_private_message(
|
||||
undo: Undo,
|
||||
request: &HttpRequest,
|
||||
activity: AnyBase,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
chat_server: ChatServerParam,
|
||||
) -> Result<HttpResponse, LemmyError> {
|
||||
let undo = Undo::from_any_base(activity)?.unwrap();
|
||||
let delete = Delete::from_any_base(undo.object().as_one().unwrap().to_owned())?.unwrap();
|
||||
let note = Note::from_any_base(delete.object().as_one().unwrap().to_owned())?.unwrap();
|
||||
let user_uri = &delete.actor()?.to_owned().single_xsd_any_uri().unwrap();
|
||||
|
||||
let user = get_or_fetch_and_upsert_user(&user_uri, client, pool).await?;
|
||||
verify(request, &user)?;
|
||||
|
||||
insert_activity(user.id, delete, false, pool).await?;
|
||||
|
||||
let private_message = PrivateMessageForm::from_apub(¬e, client, pool).await?;
|
||||
|
||||
|
|
|
@ -68,12 +68,18 @@ fn check_is_apub_id_valid(apub_id: &Url) -> Result<(), LemmyError> {
|
|||
return Err(anyhow!("invalid apub id scheme: {:?}", apub_id.scheme()).into());
|
||||
}
|
||||
|
||||
let allowed_instances: Vec<String> = Settings::get()
|
||||
let mut allowed_instances: Vec<String> = Settings::get()
|
||||
.federation
|
||||
.allowed_instances
|
||||
.split(',')
|
||||
.map(|d| d.to_string())
|
||||
.collect();
|
||||
// need to allow this explicitly because apub activities might contain objects from our local
|
||||
// instance. replace is needed to remove the port in our federation test setup.
|
||||
let settings = Settings::get();
|
||||
let local_instance = settings.hostname.split(':').collect::<Vec<&str>>();
|
||||
allowed_instances.push(local_instance.first().unwrap().to_string());
|
||||
|
||||
match apub_id.domain() {
|
||||
Some(d) => {
|
||||
let contains = allowed_instances.contains(&d.to_owned());
|
||||
|
|
Loading…
Reference in a new issue