From 3d2e19f3c3c080fdac4c7e46abac909bcbdd7e48 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 6 Jul 2021 04:46:30 +0200 Subject: [PATCH] wip: try to fix activity routing --- Cargo.lock | 7 - api_tests/package.json | 2 +- api_tests/prepare-drone-federation-test.sh | 1 + api_tests/src/post.spec.ts | 3 +- crates/apub/Cargo.toml | 1 - crates/apub/src/activity_queue.rs | 6 +- crates/apub/src/fetcher/mod.rs | 48 ++----- crates/apub/src/lib.rs | 2 - crates/apub_lib/src/lib.rs | 43 +++++- .../src/activities/following/accept.rs | 39 ++++-- crates/apub_receive/src/activities/mod.rs | 1 + .../src/activities/post/create.rs | 37 ++++-- .../apub_receive/src/activities/post/like.rs | 28 ++-- .../src/activities/post/undo_like.rs | 4 +- crates/apub_receive/src/http/community.rs | 3 +- crates/apub_receive/src/http/inbox_enums.rs | 3 + crates/apub_receive/src/http/mod.rs | 125 +++++++++++++----- crates/apub_receive/src/http/person.rs | 3 +- 18 files changed, 226 insertions(+), 130 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a578393f..23b86fab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1876,7 +1876,6 @@ dependencies = [ "strum_macros 0.20.1", "thiserror", "tokio 0.3.7", - "trait_enum", "url", "uuid", ] @@ -3690,12 +3689,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "trait_enum" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130dd741d3c71f76d031e58caffff3624eaaa2db9bd8c4b05406a71885300fc7" - [[package]] name = "trust-dns-proto" version = "0.19.6" diff --git a/api_tests/package.json b/api_tests/package.json index 2a88a0983..9b9ee1327 100644 --- a/api_tests/package.json +++ b/api_tests/package.json @@ -9,7 +9,7 @@ "scripts": { "lint": "tsc --noEmit && eslint --report-unused-disable-directives --ext .js,.ts,.tsx src", "fix": "prettier --write src && eslint --fix src", - "api-test": "jest src/ -i --verbose" + "api-test": "jest src/post.spec.ts -i --verbose" }, "devDependencies": { "@types/jest": "^26.0.23", diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index ae9c1293a..c2c7805aa 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -3,6 +3,7 @@ set -e export LEMMY_TEST_SEND_SYNC=1 export RUST_BACKTRACE=1 +export RUST_LOG="warn,lemmy_server=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_apub_receive=debug,lemmy_db_queries=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE" diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 36e5ac3fa..243595921 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -88,7 +88,7 @@ test('Create a post', async () => { let searchEpsilon = await searchPost(epsilon, postRes.post_view.post); expect(searchEpsilon.posts[0]).toBeUndefined(); }); - +/* test('Create a post in a non-existent community', async () => { let postRes = await createPost(alpha, -2); expect(postRes).toStrictEqual({ error: 'couldnt_create_post' }); @@ -363,3 +363,4 @@ test('Enforce community ban for federated user', async () => { let betaPost = searchBeta.posts[0]; expect(betaPost).toBeDefined(); }); +*/ \ No newline at end of file diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index 6a2064cd1..35601db1e 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -50,4 +50,3 @@ thiserror = "1.0.26" background-jobs = "0.8.0" reqwest = { version = "0.10.10", features = ["json"] } backtrace = "0.3.56" -trait_enum = "0.5.0" diff --git a/crates/apub/src/activity_queue.rs b/crates/apub/src/activity_queue.rs index 7016e0ca5..22b88d148 100644 --- a/crates/apub/src/activity_queue.rs +++ b/crates/apub/src/activity_queue.rs @@ -49,7 +49,7 @@ where if check_is_apub_id_valid(&inbox, false).is_ok() { debug!( "Sending activity {:?} to {}", - &activity.id_unchecked(), + &activity.id_unchecked().map(ToString::to_string), &inbox ); send_activity_internal(context, activity, creator, vec![inbox], true, true).await?; @@ -88,7 +88,7 @@ where .collect(); debug!( "Sending activity {:?} to followers of {}", - &activity.id_unchecked().map(|i| i.to_string()), + &activity.id_unchecked().map(ToString::to_string), &community.actor_id ); @@ -127,7 +127,7 @@ where check_is_apub_id_valid(&inbox, false)?; debug!( "Sending activity {:?} to community {}", - &activity.id_unchecked(), + &activity.id_unchecked().map(ToString::to_string), &community.actor_id ); // dont send to object_actor here, as that is responsibility of the community itself diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index 5d574e43a..ec12c22f2 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -14,10 +14,7 @@ use crate::{ }; use chrono::NaiveDateTime; use http::StatusCode; -use lemmy_db_schema::{ - naive_now, - source::{community::Community, person::Person}, -}; +use lemmy_db_schema::naive_now; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use serde::Deserialize; @@ -40,47 +37,26 @@ where false } -trait_enum! { -pub enum Actor: ActorType { +// TODO: remove this +pub enum Actor { Person, Community, } -} -/* -impl ActorType for Actor { - fn is_local(&self) -> bool { - self. - self.is_local() - } - fn actor_id(&self) -> Url { - self.actor_id() - } - fn name(&self) -> String { - self.name() - } - fn public_key(&self) -> Option { - self.public_key() - } - fn private_key(&self) -> Option { - self.private_key() - } - fn get_shared_inbox_or_inbox_url(&self) -> Url { - self.get_shared_inbox_or_inbox_url() - } -} - */ +/// Get a remote actor from its apub ID (either a person or a community). Thin wrapper around +/// `get_or_fetch_and_upsert_person()` and `get_or_fetch_and_upsert_community()`. +/// +/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. +/// Otherwise it is fetched from the remote instance, stored and returned. pub async fn get_or_fetch_and_upsert_actor( apub_id: &Url, context: &LemmyContext, recursion_counter: &mut i32, -) -> Result { +) -> Result, LemmyError> { let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; - let actor: Actor = match community { - Ok(c) => Actor::Community(c), - Err(_) => { - Actor::Person(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?) - } + let actor: Box = match community { + Ok(c) => Box::new(c), + Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?), }; Ok(actor) } diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index bb4a80160..6bb976754 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -1,7 +1,5 @@ #[macro_use] extern crate lazy_static; -#[macro_use] -extern crate trait_enum; pub mod activities; pub mod activity_queue; diff --git a/crates/apub_lib/src/lib.rs b/crates/apub_lib/src/lib.rs index 34a93f15e..00cd5dc8b 100644 --- a/crates/apub_lib/src/lib.rs +++ b/crates/apub_lib/src/lib.rs @@ -1,4 +1,9 @@ -use activitystreams::error::DomainError; +use activitystreams::{ + base::AnyBase, + error::DomainError, + primitives::OneOrMany, + unparsed::Unparsed, +}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use std::marker::PhantomData; @@ -31,6 +36,42 @@ pub enum PublicUrl { Public, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ActivityCommonFields { + #[serde(rename = "@context")] + pub context: OneOrMany, + id: Url, + pub actor: Url, + + // unparsed fields + #[serde(flatten)] + pub unparsed: Unparsed, +} + +impl ActivityCommonFields { + pub fn id_unchecked(&self) -> &Url { + &self.id + } +} + +#[async_trait::async_trait(?Send)] +pub trait ActivityHandlerNew { + // TODO: also need to check for instance/community blocks in here + async fn verify( + &self, + context: &LemmyContext, + request_counter: &mut i32, + ) -> Result<(), LemmyError>; + + async fn receive( + &self, + context: &LemmyContext, + request_counter: &mut i32, + ) -> Result<(), LemmyError>; + fn common(&self) -> &ActivityCommonFields; +} + #[async_trait::async_trait(?Send)] pub trait ActivityHandler { type Actor; diff --git a/crates/apub_receive/src/activities/following/accept.rs b/crates/apub_receive/src/activities/following/accept.rs index c172f5df3..7d7891132 100644 --- a/crates/apub_receive/src/activities/following/accept.rs +++ b/crates/apub_receive/src/activities/following/accept.rs @@ -1,10 +1,18 @@ use crate::activities::{following::follow::FollowCommunity, LemmyActivity}; use activitystreams::activity::kind::AcceptType; use lemmy_api_common::blocking; -use lemmy_apub::{check_is_apub_id_valid, fetcher::person::get_or_fetch_and_upsert_person}; -use lemmy_apub_lib::{verify_domains_match, ActivityHandler}; +use lemmy_apub::{ + check_is_apub_id_valid, + fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, +}; +use lemmy_apub_lib::{ + verify_domains_match, + ActivityCommonFields, + ActivityHandler, + ActivityHandlerNew, +}; use lemmy_db_queries::Followable; -use lemmy_db_schema::source::community::{Community, CommunityFollower}; +use lemmy_db_schema::source::community::CommunityFollower; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use url::Url; @@ -16,32 +24,37 @@ pub struct AcceptFollowCommunity { object: LemmyActivity, #[serde(rename = "type")] kind: AcceptType, + #[serde(flatten)] + common: ActivityCommonFields, } /// Handle accepted follows #[async_trait::async_trait(?Send)] -impl ActivityHandler for LemmyActivity { - type Actor = Community; - - async fn verify(&self, context: &LemmyContext) -> Result<(), LemmyError> { - verify_domains_match(&self.actor, self.id_unchecked())?; - check_is_apub_id_valid(&self.actor, false)?; - self.inner.object.verify(context).await +impl ActivityHandlerNew for AcceptFollowCommunity { + async fn verify(&self, context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> { + verify_domains_match(&self.common.actor, &self.common.id_unchecked())?; + check_is_apub_id_valid(&self.common.actor, false)?; + self.object.verify(context).await } async fn receive( &self, - actor: Self::Actor, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let person = get_or_fetch_and_upsert_person(&self.inner.to, context, request_counter).await?; + let actor = + get_or_fetch_and_upsert_community(&self.common.actor, context, request_counter).await?; + let to = get_or_fetch_and_upsert_person(&self.to, context, request_counter).await?; // This will throw an error if no follow was requested blocking(context.pool(), move |conn| { - CommunityFollower::follow_accepted(conn, actor.id, person.id) + CommunityFollower::follow_accepted(conn, actor.id, to.id) }) .await??; Ok(()) } + + fn common(&self) -> &ActivityCommonFields { + &self.common + } } diff --git a/crates/apub_receive/src/activities/mod.rs b/crates/apub_receive/src/activities/mod.rs index fbddb0bbd..873a13ba9 100644 --- a/crates/apub_receive/src/activities/mod.rs +++ b/crates/apub_receive/src/activities/mod.rs @@ -14,6 +14,7 @@ pub mod following; pub mod post; pub mod private_message; +// TODO: remove this #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct LemmyActivity { diff --git a/crates/apub_receive/src/activities/post/create.rs b/crates/apub_receive/src/activities/post/create.rs index d74bd4884..ffe9fccb1 100644 --- a/crates/apub_receive/src/activities/post/create.rs +++ b/crates/apub_receive/src/activities/post/create.rs @@ -1,8 +1,14 @@ -use crate::activities::{post::send_websocket_message, LemmyActivity}; +use crate::activities::post::send_websocket_message; use activitystreams::{activity::kind::CreateType, base::BaseExt}; -use lemmy_apub::{check_is_apub_id_valid, objects::FromApub, ActorType, PageExt}; -use lemmy_apub_lib::{verify_domains_match, ActivityHandler, PublicUrl}; -use lemmy_db_schema::source::{person::Person, post::Post}; +use lemmy_apub::{ + check_is_apub_id_valid, + fetcher::person::get_or_fetch_and_upsert_person, + objects::FromApub, + ActorType, + PageExt, +}; +use lemmy_apub_lib::{verify_domains_match, ActivityCommonFields, ActivityHandlerNew, PublicUrl}; +use lemmy_db_schema::source::post::Post; use lemmy_utils::LemmyError; use lemmy_websocket::{LemmyContext, UserOperationCrud}; use url::Url; @@ -15,26 +21,27 @@ pub struct CreatePost { cc: Vec, #[serde(rename = "type")] kind: CreateType, + #[serde(flatten)] + common: ActivityCommonFields, } #[async_trait::async_trait(?Send)] -impl ActivityHandler for LemmyActivity { - type Actor = Person; - - async fn verify(&self, _context: &LemmyContext) -> Result<(), LemmyError> { - verify_domains_match(self.id_unchecked(), &self.actor)?; - self.inner.object.id(self.actor.as_str())?; - check_is_apub_id_valid(&self.actor, false) +impl ActivityHandlerNew for CreatePost { + async fn verify(&self, _context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> { + verify_domains_match(self.common.id_unchecked(), &self.common.actor)?; + self.object.id(self.common.actor.as_str())?; + check_is_apub_id_valid(&self.common.actor, false) } async fn receive( &self, - actor: Self::Actor, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { + let actor = + get_or_fetch_and_upsert_person(&self.common.actor, context, request_counter).await?; let post = Post::from_apub( - &self.inner.object, + &self.object, context, actor.actor_id(), request_counter, @@ -44,4 +51,8 @@ impl ActivityHandler for LemmyActivity { send_websocket_message(post.id, UserOperationCrud::CreatePost, context).await } + + fn common(&self) -> &ActivityCommonFields { + &self.common + } } diff --git a/crates/apub_receive/src/activities/post/like.rs b/crates/apub_receive/src/activities/post/like.rs index 9cd16ec73..c932160d9 100644 --- a/crates/apub_receive/src/activities/post/like.rs +++ b/crates/apub_receive/src/activities/post/like.rs @@ -1,8 +1,7 @@ -use crate::activities::{post::like_or_dislike_post, LemmyActivity}; +use crate::activities::post::like_or_dislike_post; use activitystreams::activity::kind::LikeType; -use lemmy_apub::check_is_apub_id_valid; -use lemmy_apub_lib::{verify_domains_match, ActivityHandler, PublicUrl}; -use lemmy_db_schema::source::person::Person; +use lemmy_apub::{check_is_apub_id_valid, fetcher::person::get_or_fetch_and_upsert_person}; +use lemmy_apub_lib::{verify_domains_match, ActivityCommonFields, ActivityHandlerNew, PublicUrl}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use url::Url; @@ -15,23 +14,28 @@ pub struct LikePost { cc: [Url; 1], #[serde(rename = "type")] kind: LikeType, + #[serde(flatten)] + common: ActivityCommonFields, } #[async_trait::async_trait(?Send)] -impl ActivityHandler for LemmyActivity { - type Actor = Person; - - async fn verify(&self, _context: &LemmyContext) -> Result<(), LemmyError> { - verify_domains_match(&self.actor, self.id_unchecked())?; - check_is_apub_id_valid(&self.actor, false) +impl ActivityHandlerNew for LikePost { + async fn verify(&self, _context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> { + verify_domains_match(&self.common.actor, self.common.id_unchecked())?; + check_is_apub_id_valid(&self.common.actor, false) } async fn receive( &self, - actor: Self::Actor, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - like_or_dislike_post(1, &actor, &self.inner.object, context, request_counter).await + let actor = + get_or_fetch_and_upsert_person(&self.common.actor, context, request_counter).await?; + like_or_dislike_post(1, &actor, &self.object, context, request_counter).await + } + + fn common(&self) -> &ActivityCommonFields { + &self.common } } diff --git a/crates/apub_receive/src/activities/post/undo_like.rs b/crates/apub_receive/src/activities/post/undo_like.rs index bc05cf6ca..e1caa7159 100644 --- a/crates/apub_receive/src/activities/post/undo_like.rs +++ b/crates/apub_receive/src/activities/post/undo_like.rs @@ -26,8 +26,8 @@ impl ActivityHandler for LemmyActivity { async fn verify(&self, context: &LemmyContext) -> Result<(), LemmyError> { verify_domains_match(&self.actor, self.id_unchecked())?; verify_domains_match(&self.actor, &self.inner.object.inner.object)?; - check_is_apub_id_valid(&self.actor, false)?; - self.inner.object.verify(context).await + check_is_apub_id_valid(&self.actor, false) + //self.inner.object.verify(context).await } async fn receive( diff --git a/crates/apub_receive/src/http/community.rs b/crates/apub_receive/src/http/community.rs index 0e6e42efa..1b68b59f5 100644 --- a/crates/apub_receive/src/http/community.rs +++ b/crates/apub_receive/src/http/community.rs @@ -61,7 +61,8 @@ pub async fn community_inbox( path: web::Path, context: web::Data, ) -> Result { - receive_activity(request, input.into_inner(), Some(path.0), context).await + //receive_activity(request, input.into_inner(), Some(path.0), context).await + todo!() } /// Returns an empty followers collection, only populating the size (for privacy). diff --git a/crates/apub_receive/src/http/inbox_enums.rs b/crates/apub_receive/src/http/inbox_enums.rs index 6aab33244..8f0854ddd 100644 --- a/crates/apub_receive/src/http/inbox_enums.rs +++ b/crates/apub_receive/src/http/inbox_enums.rs @@ -49,6 +49,7 @@ use lemmy_websocket::LemmyContext; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(untagged)] pub enum PersonInboxActivities { AcceptFollowCommunity(AcceptFollowCommunity), CreatePrivateMessage(CreatePrivateMessage), @@ -59,6 +60,7 @@ pub enum PersonInboxActivities { } #[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(untagged)] pub enum GroupInboxActivities { FollowCommunity(FollowCommunity), UndoFollowCommunity(UndoFollowCommunity), @@ -94,6 +96,7 @@ pub enum GroupInboxActivities { } #[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(untagged)] pub enum SharedInboxActivities { // received by person AcceptFollowCommunity(AcceptFollowCommunity), diff --git a/crates/apub_receive/src/http/mod.rs b/crates/apub_receive/src/http/mod.rs index 5d40e58d3..fe79992da 100644 --- a/crates/apub_receive/src/http/mod.rs +++ b/crates/apub_receive/src/http/mod.rs @@ -1,24 +1,28 @@ -use actix_web::{body::Body, web, HttpRequest, HttpResponse}; -use http::StatusCode; -use serde::{Deserialize, Serialize}; -use url::Url; - -use crate::{activities::LemmyActivity, http::inbox_enums::SharedInboxActivities}; +use crate::activities::{ + following::accept::AcceptFollowCommunity, + post::{create::CreatePost, like::LikePost}, +}; +use actix_web::{body::Body, web, web::Bytes, HttpRequest, HttpResponse}; use anyhow::{anyhow, Context}; +use futures::StreamExt; +use http::StatusCode; use lemmy_api_common::blocking; use lemmy_apub::{ check_is_apub_id_valid, extensions::signatures::verify_signature, - fetcher::{get_or_fetch_and_upsert_actor, Actor}, + fetcher::get_or_fetch_and_upsert_actor, insert_activity, APUB_JSON_CONTENT_TYPE, }; -use lemmy_apub_lib::ActivityHandler; +use lemmy_apub_lib::{ActivityCommonFields, ActivityHandlerNew}; use lemmy_db_queries::{source::activity::Activity_, DbPool}; use lemmy_db_schema::source::activity::Activity; use lemmy_utils::{location_info, settings::structs::Settings, LemmyError}; use lemmy_websocket::LemmyContext; -use std::fmt::Debug; +use log::debug; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, io::Read}; +use url::Url; pub mod comment; pub mod community; @@ -26,51 +30,99 @@ pub mod inbox_enums; pub mod person; pub mod post; -pub async fn shared_inbox( - request: HttpRequest, - input: web::Json>, - context: web::Data, -) -> Result { - receive_activity(request, input.into_inner(), None, context).await +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +enum Ac { + CreatePost(CreatePost), + LikePost(LikePost), + AcceptFollowCommunity(AcceptFollowCommunity), } -async fn receive_activity( +// TODO: write a derive trait which creates this +#[async_trait::async_trait(?Send)] +impl ActivityHandlerNew for Ac { + async fn verify( + &self, + context: &LemmyContext, + request_counter: &mut i32, + ) -> Result<(), LemmyError> { + match self { + Ac::CreatePost(a) => a.verify(context, request_counter).await, + Ac::LikePost(a) => a.verify(context, request_counter).await, + Ac::AcceptFollowCommunity(a) => a.verify(context, request_counter).await, + } + } + + async fn receive( + &self, + context: &LemmyContext, + request_counter: &mut i32, + ) -> Result<(), LemmyError> { + match self { + Ac::CreatePost(a) => a.receive(context, request_counter).await, + Ac::LikePost(a) => a.receive(context, request_counter).await, + Ac::AcceptFollowCommunity(a) => a.receive(context, request_counter).await, + } + } + + fn common(&self) -> &ActivityCommonFields { + match self { + Ac::CreatePost(a) => a.common(), + Ac::LikePost(a) => a.common(), + Ac::AcceptFollowCommunity(a) => a.common(), + } + } +} + +pub async fn shared_inbox( request: HttpRequest, - activity: LemmyActivity, + mut body: web::Payload, + context: web::Data, +) -> Result { + let mut bytes = web::BytesMut::new(); + while let Some(item) = body.next().await { + bytes.extend_from_slice(&item?); + } + let mut unparsed: String = String::new(); + Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?; + receive_activity::(request, &unparsed, None, context).await +} + +async fn receive_activity<'a, T>( + request: HttpRequest, + activity: &'a str, expected_name: Option, context: web::Data, ) -> Result where - T: ActivityHandler - + Clone - + Serialize - + std::fmt::Debug - + Send - + 'static, + T: ActivityHandlerNew + Clone + Deserialize<'a> + Serialize + std::fmt::Debug + Send + 'static, { + debug!("Received activity {}", activity); + let activity = serde_json::from_str::(activity)?; + let activity_data = activity.common(); // TODO: which order to check things? // Do nothing if we received the same activity before - if is_activity_already_known(context.pool(), activity.id_unchecked()).await? { + if is_activity_already_known(context.pool(), activity_data.id_unchecked()).await? { return Ok(HttpResponse::Ok().finish()); } assert_activity_not_local(&activity)?; - check_is_apub_id_valid(&activity.actor, false)?; - activity.inner.verify(&context).await?; + check_is_apub_id_valid(&activity_data.actor, false)?; let request_counter = &mut 0; - let actor: Actor = - get_or_fetch_and_upsert_actor(&activity.actor, &context, request_counter).await?; + let actor = + get_or_fetch_and_upsert_actor(&activity_data.actor, &context, request_counter).await?; if let Some(expected) = expected_name { if expected != actor.name() { return Ok(HttpResponse::BadRequest().finish()); } } verify_signature(&request, &actor.public_key().context(location_info!())?)?; + activity.verify(&context, request_counter).await?; // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen // if we receive the same activity twice in very quick succession. insert_activity( - activity.id_unchecked(), + activity_data.id_unchecked(), activity.clone(), false, true, @@ -78,10 +130,7 @@ where ) .await?; - activity - .inner - .receive(actor, &context, request_counter) - .await?; + activity.receive(&context, request_counter).await?; Ok(HttpResponse::Ok().finish()) } @@ -153,10 +202,14 @@ pub(crate) async fn is_activity_already_known( } } -pub(in crate::http) fn assert_activity_not_local( - activity: &LemmyActivity, +fn assert_activity_not_local( + activity: &T, ) -> Result<(), LemmyError> { - let activity_domain = activity.id_unchecked().domain().context(location_info!())?; + let activity_domain = activity + .common() + .id_unchecked() + .domain() + .context(location_info!())?; if activity_domain == Settings::get().hostname() { return Err( diff --git a/crates/apub_receive/src/http/person.rs b/crates/apub_receive/src/http/person.rs index 0890c1b7a..808f093ed 100644 --- a/crates/apub_receive/src/http/person.rs +++ b/crates/apub_receive/src/http/person.rs @@ -53,7 +53,8 @@ pub async fn person_inbox( path: web::Path, context: web::Data, ) -> Result { - receive_activity(request, input.into_inner(), Some(path.0), context).await + //receive_activity(request, input.into_inner(), Some(path.0), context).await + todo!() } pub(crate) async fn get_apub_person_outbox(