More federation compat (#1894)

* Make HTTP signatures compatible with Pleroma

* Send Announce/Page, Announce/Note for Pleroma compatibility

* remove unused code
This commit is contained in:
Nutomic 2021-11-11 19:49:15 +00:00 committed by GitHub
parent dbc92d6ee0
commit 1b9414f292
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 149 additions and 80 deletions

View file

@ -26,13 +26,12 @@ pub(crate) trait GetCommunity {
} }
impl AnnounceActivity { impl AnnounceActivity {
pub async fn send( fn new(
object: AnnouncableActivities, object: AnnouncableActivities,
community: &ApubCommunity, community: &ApubCommunity,
additional_inboxes: Vec<Url>,
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> { ) -> Result<AnnounceActivity, LemmyError> {
let announce = AnnounceActivity { Ok(AnnounceActivity {
actor: ObjectId::new(community.actor_id()), actor: ObjectId::new(community.actor_id()),
to: vec![public()], to: vec![public()],
object, object,
@ -43,11 +42,49 @@ impl AnnounceActivity {
&context.settings().get_protocol_and_hostname(), &context.settings().get_protocol_and_hostname(),
)?, )?,
unparsed: Default::default(), unparsed: Default::default(),
}; })
}
pub async fn send(
object: AnnouncableActivities,
community: &ApubCommunity,
additional_inboxes: Vec<Url>,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let announce = AnnounceActivity::new(object.clone(), community, context)?;
let inboxes = community let inboxes = community
.get_follower_inboxes(additional_inboxes, context) .get_follower_inboxes(additional_inboxes.clone(), context)
.await?; .await?;
send_lemmy_activity(context, &announce, &announce.id, community, inboxes, false).await send_lemmy_activity(
context,
&announce,
&announce.id,
community,
inboxes.clone(),
false,
)
.await?;
// Pleroma (and likely Mastodon) can't handle activities like Announce/Create/Page, so for
// compatibility, we also send Announce/Page and Announce/Note (for new and updated
// posts/comments).
use AnnouncableActivities::*;
let object = match object {
CreateOrUpdatePost(c) => Page(c.object),
CreateOrUpdateComment(c) => Note(c.object),
_ => return Ok(()),
};
let announce_compat = AnnounceActivity::new(object, community, context)?;
send_lemmy_activity(
context,
&announce_compat,
&announce_compat.id,
community,
inboxes,
false,
)
.await?;
Ok(())
} }
} }
@ -77,14 +114,7 @@ impl ActivityHandler for AnnounceActivity {
if is_activity_already_known(context.pool(), &object_data.id).await? { if is_activity_already_known(context.pool(), &object_data.id).await? {
return Ok(()); return Ok(());
} }
insert_activity( insert_activity(&object_data.id, &self.object, false, true, context.pool()).await?;
&object_data.id,
self.object.clone(),
false,
true,
context.pool(),
)
.await?;
self.object.receive(context, request_counter).await self.object.receive(context, request_counter).await
} }
} }

View file

@ -28,7 +28,7 @@ pub(crate) async fn send_to_community<T: ActorType>(
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
// if this is a local community, we need to do an announce from the community instead // if this is a local community, we need to do an announce from the community instead
if community.local { if community.local {
insert_activity(activity_id, activity.clone(), true, false, context.pool()).await?; insert_activity(activity_id, &activity, true, false, context.pool()).await?;
AnnounceActivity::send(activity, community, additional_inboxes, context).await?; AnnounceActivity::send(activity, community, additional_inboxes, context).await?;
} else { } else {
let mut inboxes = additional_inboxes; let mut inboxes = additional_inboxes;

View file

@ -173,7 +173,7 @@ async fn send_lemmy_activity<T: Serialize>(
insert_activity( insert_activity(
activity_id, activity_id,
serialised_activity.clone(), &serialised_activity,
true, true,
sensitive, sensitive,
context.pool(), context.pool(),

View file

@ -59,6 +59,7 @@ impl CreateOrUpdatePost {
}) })
.await?? .await??
.into(); .into();
let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?; let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?;
let id = create_or_update.id.clone(); let id = create_or_update.id.clone();
let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update); let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update);

View file

@ -1,7 +1,8 @@
use crate::{ use crate::{
activities::community::announce::GetCommunity, activities::community::announce::GetCommunity,
objects::community::ApubCommunity, objects::community::ApubCommunity,
protocol::activities::{ protocol::{
activities::{
community::{ community::{
add_mod::AddMod, add_mod::AddMod,
announce::AnnounceActivity, announce::AnnounceActivity,
@ -25,6 +26,8 @@ use crate::{
}, },
voting::{undo_vote::UndoVote, vote::Vote}, voting::{undo_vote::UndoVote, vote::Vote},
}, },
objects::{note::Note, page::Page},
},
}; };
use lemmy_apub_lib::traits::ActivityHandler; use lemmy_apub_lib::traits::ActivityHandler;
use lemmy_utils::LemmyError; use lemmy_utils::LemmyError;
@ -79,6 +82,10 @@ pub enum AnnouncableActivities {
UndoBlockUserFromCommunity(UndoBlockUserFromCommunity), UndoBlockUserFromCommunity(UndoBlockUserFromCommunity),
AddMod(AddMod), AddMod(AddMod),
RemoveMod(RemoveMod), RemoveMod(RemoveMod),
// For compatibility with Pleroma/Mastodon (send only)
Page(Page),
// For compatibility with Pleroma/Mastodon (send only)
Note(Note),
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -101,6 +108,8 @@ impl GetCommunity for AnnouncableActivities {
UndoBlockUserFromCommunity(a) => a.get_community(context, request_counter).await?, UndoBlockUserFromCommunity(a) => a.get_community(context, request_counter).await?,
AddMod(a) => a.get_community(context, request_counter).await?, AddMod(a) => a.get_community(context, request_counter).await?,
RemoveMod(a) => a.get_community(context, request_counter).await?, RemoveMod(a) => a.get_community(context, request_counter).await?,
Page(_) => unimplemented!(),
Note(_) => unimplemented!(),
}; };
Ok(community) Ok(community)
} }

View file

@ -79,7 +79,7 @@ pub(in crate::http) async fn receive_group_inbox(
context: &LemmyContext, context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let actor_id = ObjectId::new(activity_data.actor.clone()); let actor_id = ObjectId::new(activity_data.actor.clone());
let res = receive_activity(request, activity.clone(), activity_data, context).await; let res = receive_activity(request, activity.clone(), activity_data, context).await?;
if let GroupInboxActivities::AnnouncableActivities(announcable) = activity { if let GroupInboxActivities::AnnouncableActivities(announcable) = activity {
let community = announcable.get_community(context, &mut 0).await?; let community = announcable.get_community(context, &mut 0).await?;
@ -89,7 +89,7 @@ pub(in crate::http) async fn receive_group_inbox(
} }
} }
res Ok(res)
} }
/// Returns an empty followers collection, only populating the size (for privacy). /// Returns an empty followers collection, only populating the size (for privacy).

View file

@ -109,14 +109,7 @@ where
// Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
// if we receive the same activity twice in very quick succession. // if we receive the same activity twice in very quick succession.
insert_activity( insert_activity(&activity_data.id, &activity, false, true, context.pool()).await?;
&activity_data.id,
activity.clone(),
false,
true,
context.pool(),
)
.await?;
info!("Receiving activity {}", activity_data.id.to_string()); info!("Receiving activity {}", activity_data.id.to_string());
activity activity

View file

@ -179,7 +179,7 @@ pub async fn get_actor_id_from_name(
/// persistent. /// persistent.
async fn insert_activity<T>( async fn insert_activity<T>(
ap_id: &Url, ap_id: &Url,
activity: T, activity: &T,
local: bool, local: bool,
sensitive: bool, sensitive: bool,
pool: &DbPool, pool: &DbPool,
@ -187,9 +187,10 @@ async fn insert_activity<T>(
where where
T: Serialize + std::fmt::Debug + Send + 'static, T: Serialize + std::fmt::Debug + Send + 'static,
{ {
let data = serde_json::to_value(activity)?;
let ap_id = ap_id.to_owned().into(); let ap_id = ap_id.to_owned().into();
blocking(pool, move |conn| { blocking(pool, move |conn| {
Activity::insert(conn, ap_id, &activity, local, sensitive) Activity::insert(conn, ap_id, data, local, sensitive)
}) })
.await??; .await??;
Ok(()) Ok(())

View file

@ -4,9 +4,15 @@ use crate::{
protocol::Source, protocol::Source,
}; };
use activitystreams::{object::kind::NoteType, unparsed::Unparsed}; use activitystreams::{object::kind::NoteType, unparsed::Unparsed};
use anyhow::anyhow;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use lemmy_api_common::blocking; use lemmy_api_common::blocking;
use lemmy_apub_lib::{object_id::ObjectId, values::MediaTypeHtml}; use lemmy_apub_lib::{
data::Data,
object_id::ObjectId,
traits::ActivityHandler,
values::MediaTypeHtml,
};
use lemmy_db_schema::{newtypes::CommentId, source::post::Post, traits::Crud}; use lemmy_db_schema::{newtypes::CommentId, source::post::Post, traits::Crud};
use lemmy_utils::LemmyError; use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
@ -81,3 +87,15 @@ impl Note {
} }
} }
} }
// For Pleroma/Mastodon compat. Unimplemented because its only used for sending.
#[async_trait::async_trait(?Send)]
impl ActivityHandler for Note {
type DataType = LemmyContext;
async fn verify(&self, _: &Data<Self::DataType>, _: &mut i32) -> Result<(), LemmyError> {
Err(anyhow!("Announce/Page can only be sent, not received").into())
}
async fn receive(self, _: &Data<Self::DataType>, _: &mut i32) -> Result<(), LemmyError> {
unimplemented!()
}
}

View file

@ -5,7 +5,12 @@ use crate::{
use activitystreams::{object::kind::PageType, unparsed::Unparsed}; use activitystreams::{object::kind::PageType, unparsed::Unparsed};
use anyhow::anyhow; use anyhow::anyhow;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use lemmy_apub_lib::{object_id::ObjectId, values::MediaTypeHtml}; use lemmy_apub_lib::{
data::Data,
object_id::ObjectId,
traits::ActivityHandler,
values::MediaTypeHtml,
};
use lemmy_utils::LemmyError; use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -73,3 +78,15 @@ impl Page {
} }
} }
} }
// For Pleroma/Mastodon compat. Unimplemented because its only used for sending.
#[async_trait::async_trait(?Send)]
impl ActivityHandler for Page {
type DataType = LemmyContext;
async fn verify(&self, _: &Data<Self::DataType>, _: &mut i32) -> Result<(), LemmyError> {
Err(anyhow!("Announce/Page can only be sent, not received").into())
}
async fn receive(self, _: &Data<Self::DataType>, _: &mut i32) -> Result<(), LemmyError> {
unimplemented!()
}
}

View file

@ -1,4 +1,4 @@
use crate::{signatures::sign_and_send, traits::ActorType, APUB_JSON_CONTENT_TYPE}; use crate::{signatures::sign_and_send, traits::ActorType};
use anyhow::{anyhow, Context, Error}; use anyhow::{anyhow, Context, Error};
use background_jobs::{ use background_jobs::{
create_server, create_server,
@ -13,7 +13,7 @@ use lemmy_utils::{location_info, LemmyError};
use log::warn; use log::warn;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin}; use std::{env, fmt::Debug, future::Future, pin::Pin};
use url::Url; use url::Url;
pub async fn send_activity( pub async fn send_activity(
@ -64,11 +64,8 @@ impl ActixJob for SendActivityTask {
} }
async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> { async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
let mut headers = BTreeMap::<String, String>::new();
headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string());
let result = sign_and_send( let result = sign_and_send(
client, client,
headers,
&task.inbox, &task.inbox,
task.activity.clone(), task.activity.clone(),
&task.actor_id, &task.actor_id,

View file

@ -1,3 +1,5 @@
use crate::APUB_JSON_CONTENT_TYPE;
use activitystreams::chrono::Utc;
use actix_web::HttpRequest; use actix_web::HttpRequest;
use anyhow::anyhow; use anyhow::anyhow;
use http::{header::HeaderName, HeaderMap, HeaderValue}; use http::{header::HeaderName, HeaderMap, HeaderValue};
@ -13,19 +15,18 @@ use openssl::{
use reqwest::{Client, Response}; use reqwest::{Client, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{collections::BTreeMap, str::FromStr}; use std::str::FromStr;
use url::Url; use url::Url;
lazy_static! { lazy_static! {
static ref CONFIG2: ConfigActix = ConfigActix::new(); static ref CONFIG2: ConfigActix = ConfigActix::new();
static ref HTTP_SIG_CONFIG: Config = Config::new(); static ref HTTP_SIG_CONFIG: Config = Config::new().mastodon_compat();
} }
/// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and /// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and
/// `activity` as request body. The request is signed with `private_key` and then sent. /// `activity` as request body. The request is signed with `private_key` and then sent.
pub async fn sign_and_send( pub async fn sign_and_send(
client: &Client, client: &Client,
headers: BTreeMap<String, String>,
inbox_url: &Url, inbox_url: &Url,
activity: String, activity: String,
actor_id: &Url, actor_id: &Url,
@ -33,16 +34,24 @@ pub async fn sign_and_send(
) -> Result<Response, LemmyError> { ) -> Result<Response, LemmyError> {
let signing_key_id = format!("{}#main-key", actor_id); let signing_key_id = format!("{}#main-key", actor_id);
let mut header_map = HeaderMap::new(); let mut headers = HeaderMap::new();
for h in headers { let mut host = inbox_url.domain().expect("read inbox domain").to_string();
header_map.insert( if let Some(port) = inbox_url.port() {
HeaderName::from_str(h.0.as_str())?, host = format!("{}:{}", host, port);
HeaderValue::from_str(h.1.as_str())?,
);
} }
headers.insert(
HeaderName::from_str("Content-Type")?,
HeaderValue::from_str(APUB_JSON_CONTENT_TYPE)?,
);
headers.insert(HeaderName::from_str("Host")?, HeaderValue::from_str(&host)?);
headers.insert(
HeaderName::from_str("Date")?,
HeaderValue::from_str(&Utc::now().to_rfc2822())?,
);
let response = client let response = client
.post(&inbox_url.to_string()) .post(&inbox_url.to_string())
.headers(header_map) .headers(headers)
.signature_with_digest( .signature_with_digest(
HTTP_SIG_CONFIG.clone(), HTTP_SIG_CONFIG.clone(),
signing_key_id, signing_key_id,

View file

@ -1,10 +1,7 @@
use crate::{newtypes::DbUrl, source::activity::*, traits::Crud}; use crate::{newtypes::DbUrl, source::activity::*, traits::Crud};
use diesel::{dsl::*, result::Error, *}; use diesel::{dsl::*, result::Error, *};
use serde::Serialize; use serde_json::Value;
use std::{ use std::io::{Error as IoError, ErrorKind};
fmt::Debug,
io::{Error as IoError, ErrorKind},
};
impl Crud for Activity { impl Crud for Activity {
type Form = ActivityForm; type Form = ActivityForm;
@ -38,19 +35,16 @@ impl Crud for Activity {
} }
impl Activity { impl Activity {
pub fn insert<T>( pub fn insert(
conn: &PgConnection, conn: &PgConnection,
ap_id: DbUrl, ap_id: DbUrl,
data: &T, data: Value,
local: bool, local: bool,
sensitive: bool, sensitive: bool,
) -> Result<Activity, IoError> ) -> Result<Activity, IoError> {
where
T: Serialize + Debug,
{
let activity_form = ActivityForm { let activity_form = ActivityForm {
ap_id, ap_id,
data: serde_json::to_value(&data)?, data,
local: Some(local), local: Some(local),
sensitive, sensitive,
updated: None, updated: None,