rewrite with proper error handling and less boilerplate

This commit is contained in:
Felix Ableitner 2020-08-19 16:26:20 +02:00
parent 512eeb0432
commit f7f64363ea
9 changed files with 143 additions and 177 deletions

View file

@ -1,33 +1,38 @@
use crate::{ use crate::{
apub::{activity_sender::SendUserActivity, community::do_announce, insert_activity}, apub::{activity_sender::send_activity, community::do_announce, insert_activity},
LemmyContext, LemmyContext,
LemmyError, LemmyError,
}; };
use activitystreams::base::AnyBase; use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use lemmy_db::{community::Community, user::User_}; use lemmy_db::{community::Community, user::User_};
use lemmy_utils::{get_apub_protocol_string, settings::Settings}; use lemmy_utils::{get_apub_protocol_string, settings::Settings};
use serde::{export::fmt::Debug, Serialize};
use url::{ParseError, Url}; use url::{ParseError, Url};
use uuid::Uuid; use uuid::Uuid;
pub async fn send_activity_to_community( pub async fn send_activity_to_community<T, Kind>(
creator: &User_, creator: &User_,
community: &Community, community: &Community,
to: Vec<Url>, to: Vec<Url>,
activity: AnyBase, activity: T,
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError>
where
T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
// TODO: looks like call this sometimes with activity, and sometimes with any_base
insert_activity(creator.id, activity.clone(), true, context.pool()).await?; insert_activity(creator.id, activity.clone(), true, context.pool()).await?;
// if this is a local community, we need to do an announce from the community instead // if this is a local community, we need to do an announce from the community instead
if community.local { if community.local {
do_announce(activity, &community, creator, context).await?; do_announce(activity.into_any_base()?, &community, creator, context).await?;
} else { } else {
let message = SendUserActivity { send_activity(context.activity_sender(), activity, creator, to)?;
activity,
actor: creator.to_owned(),
to,
};
context.activity_sender().do_send(message);
} }
Ok(()) Ok(())

View file

@ -2,29 +2,59 @@ use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError, LemmyError,
}; };
use activitystreams::base::AnyBase; use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use actix::prelude::*; use actix::prelude::*;
use anyhow::Context;
use awc::Client; use awc::Client;
use lemmy_db::{community::Community, user::User_}; use lemmy_utils::{location_info, settings::Settings};
use lemmy_utils::settings::Settings; use log::{debug, warn};
use log::debug; use serde::Serialize;
use url::Url; use url::Url;
// We cant use ActorType here, because it doesnt implement Sized pub fn send_activity<T, Kind>(
#[derive(Message)] activity_sender: &Addr<ActivitySender>,
#[rtype(result = "()")] activity: T,
pub struct SendUserActivity { actor: &dyn ActorType,
pub activity: AnyBase, to: Vec<Url>,
pub actor: User_, ) -> Result<(), LemmyError>
pub to: Vec<Url>, where
T: AsObject<Kind>,
T: Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = activity.into_any_base()?;
let serialised_activity = serde_json::to_string(&activity)?;
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
}
let message = SendActivity {
activity: serialised_activity,
to,
actor_id: actor.actor_id()?,
private_key: actor.private_key().context(location_info!())?,
};
activity_sender.do_send(message);
Ok(())
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct SendCommunityActivity { struct SendActivity {
pub activity: AnyBase, activity: String,
pub actor: Community, to: Vec<Url>,
pub to: Vec<Url>, actor_id: Url,
private_key: String,
} }
pub struct ActivitySender { pub struct ActivitySender {
@ -38,51 +68,55 @@ impl ActivitySender {
} }
impl Actor for ActivitySender { impl Actor for ActivitySender {
type Context = Context<Self>; type Context = actix::Context<Self>;
} }
impl Handler<SendUserActivity> for ActivitySender { impl Handler<SendActivity> for ActivitySender {
type Result = (); type Result = ();
fn handle(&mut self, msg: SendUserActivity, _ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: SendActivity, _ctx: &mut actix::Context<Self>) -> Self::Result {
send_activity(msg.activity, &msg.actor, msg.to, &self.client);
}
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = ();
fn handle(&mut self, msg: SendCommunityActivity, _ctx: &mut Context<Self>) -> Self::Result {
send_activity(msg.activity, &msg.actor, msg.to, &self.client);
}
}
fn send_activity(activity: AnyBase, actor: &dyn ActorType, to: Vec<Url>, client: &Client) {
if !Settings::get().federation.enabled {
return;
}
let serialised_activity = serde_json::to_string(&activity).unwrap();
debug!( debug!(
"Sending activitypub activity {} to {:?}", "Sending activitypub activity {} to {:?}",
&serialised_activity, &to &msg.activity, &msg.to
); );
for to_url in &to { Box::pin(async move {
check_is_apub_id_valid(&to_url).unwrap(); for to_url in &msg.to {
let request = self
let request = client .client
.post(to_url.as_str()) .post(to_url.as_str())
.header("Content-Type", "application/json"); .header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone(); let signed = sign(
Box::pin(async move { request,
// TODO: need to remove the unwrap, but ? causes compile errors msg.activity.clone(),
&msg.actor_id,
msg.private_key.to_owned(),
)
.await;
let signed = match signed {
Ok(s) => s,
Err(e) => {
warn!(
"Failed to sign activity {} from {}: {}",
&msg.activity, &msg.actor_id, e
);
return;
}
};
// TODO: if the sending fails, it should retry with exponential backoff // TODO: if the sending fails, it should retry with exponential backoff
let signed = sign(request, actor, serialised_activity).await.unwrap(); match signed.send().await {
let res = signed.send().await; Ok(_) => {}
debug!("Result for activity send: {:?}", res); Err(e) => {
Ok::<(), LemmyError>(()) warn!(
"Failed to send activity {} to {}: {}",
&msg.activity, &to_url, e
);
}
}
}
}); });
} }
} }

View file

@ -224,14 +224,7 @@ impl ApubObjectType for Comment {
// Set the mention tags // Set the mention tags
.set_many_tags(maa.get_tags()?); .set_many_tags(maa.get_tags()?);
send_activity_to_community( send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?;
&creator,
&community,
maa.inboxes,
create.into_any_base()?,
context,
)
.await?;
Ok(()) Ok(())
} }
@ -259,14 +252,7 @@ impl ApubObjectType for Comment {
// Set the mention tags // Set the mention tags
.set_many_tags(maa.get_tags()?); .set_many_tags(maa.get_tags()?);
send_activity_to_community( send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?;
&creator,
&community,
maa.inboxes,
update.into_any_base()?,
context,
)
.await?;
Ok(()) Ok(())
} }
@ -293,7 +279,7 @@ impl ApubObjectType for Comment {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
delete.into_any_base()?, delete,
context, context,
) )
.await?; .await?;
@ -336,7 +322,7 @@ impl ApubObjectType for Comment {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;
@ -366,7 +352,7 @@ impl ApubObjectType for Comment {
&mod_, &mod_,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
remove.into_any_base()?, remove,
context, context,
) )
.await?; .await?;
@ -405,7 +391,7 @@ impl ApubObjectType for Comment {
&mod_, &mod_,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;
@ -438,7 +424,7 @@ impl ApubLikeableType for Comment {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
like.into_any_base()?, like,
context, context,
) )
.await?; .await?;
@ -468,7 +454,7 @@ impl ApubLikeableType for Comment {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
dislike.into_any_base()?, dislike,
context, context,
) )
.await?; .await?;
@ -510,7 +496,7 @@ impl ApubLikeableType for Comment {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;

View file

@ -2,7 +2,7 @@ use crate::{
api::{check_slurs, check_slurs_opt}, api::{check_slurs, check_slurs_opt},
apub::{ apub::{
activities::generate_activity_id, activities::generate_activity_id,
activity_sender::{SendCommunityActivity, SendUserActivity}, activity_sender::send_activity,
check_actor_domain, check_actor_domain,
create_apub_response, create_apub_response,
create_apub_tombstone_response, create_apub_tombstone_response,
@ -156,12 +156,7 @@ impl ActorType for Community {
insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
let message = SendCommunityActivity { send_activity(context.activity_sender(), accept, self, vec![to])?;
activity: accept.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -182,12 +177,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button, // Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor. // the community was the actor.
// But for delete, the creator is the actor, and does the signing // But for delete, the creator is the actor, and does the signing
let message = SendUserActivity { send_activity(context.activity_sender(), delete, creator, inboxes)?;
activity: delete.into_any_base()?,
actor: creator.to_owned(),
to: inboxes,
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -219,12 +209,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button, // Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor. // the community was the actor.
// But for delete, the creator is the actor, and does the signing // But for delete, the creator is the actor, and does the signing
let message = SendUserActivity { send_activity(context.activity_sender(), undo, creator, inboxes)?;
activity: undo.into_any_base()?,
actor: creator.to_owned(),
to: inboxes,
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -245,12 +230,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button, // Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor. // the community was the actor.
// But for delete, the creator is the actor, and does the signing // But for delete, the creator is the actor, and does the signing
let message = SendUserActivity { send_activity(context.activity_sender(), remove, mod_, inboxes)?;
activity: remove.into_any_base()?,
actor: mod_.to_owned(),
to: inboxes,
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -279,12 +259,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button, // Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor. // the community was the actor.
// But for remove , the creator is the actor, and does the signing // But for remove , the creator is the actor, and does the signing
let message = SendUserActivity { send_activity(context.activity_sender(), undo, mod_, inboxes)?;
activity: undo.into_any_base()?,
actor: mod_.to_owned(),
to: inboxes,
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -537,12 +512,7 @@ pub async fn do_announce(
let community_shared_inbox = community.get_shared_inbox_url()?; let community_shared_inbox = community.get_shared_inbox_url()?;
to.retain(|x| x != &community_shared_inbox); to.retain(|x| x != &community_shared_inbox);
let message = SendCommunityActivity { send_activity(context.activity_sender(), announce, community, to)?;
activity: announce.into_any_base()?,
actor: community.to_owned(),
to,
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }

View file

@ -16,6 +16,7 @@ use openssl::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use url::Url;
lazy_static! { lazy_static! {
static ref HTTP_SIG_CONFIG: Config = Config::new(); static ref HTTP_SIG_CONFIG: Config = Config::new();
@ -24,11 +25,11 @@ lazy_static! {
/// Signs request headers with the given keypair. /// Signs request headers with the given keypair.
pub async fn sign( pub async fn sign(
request: ClientRequest, request: ClientRequest,
actor: &dyn ActorType,
activity: String, activity: String,
actor_id: &Url,
private_key: String,
) -> Result<DigestClient<String>, LemmyError> { ) -> Result<DigestClient<String>, LemmyError> {
let signing_key_id = format!("{}#main-key", actor.actor_id()?); let signing_key_id = format!("{}#main-key", actor_id);
let private_key = actor.private_key().context(location_info!())?;
let digest_client = request let digest_client = request
.signature_with_digest( .signature_with_digest(

View file

@ -1,5 +1,5 @@
pub mod activity_sender;
pub mod activities; pub mod activities;
pub mod activity_sender;
pub mod comment; pub mod comment;
pub mod community; pub mod community;
pub mod extensions; pub mod extensions;

View file

@ -318,7 +318,7 @@ impl ApubObjectType for Post {
creator, creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
create.into_any_base()?, create,
context, context,
) )
.await?; .await?;
@ -346,7 +346,7 @@ impl ApubObjectType for Post {
creator, creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
update.into_any_base()?, update,
context, context,
) )
.await?; .await?;
@ -373,7 +373,7 @@ impl ApubObjectType for Post {
creator, creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
delete.into_any_base()?, delete,
context, context,
) )
.await?; .await?;
@ -412,7 +412,7 @@ impl ApubObjectType for Post {
creator, creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;
@ -439,7 +439,7 @@ impl ApubObjectType for Post {
mod_, mod_,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
remove.into_any_base()?, remove,
context, context,
) )
.await?; .await?;
@ -474,7 +474,7 @@ impl ApubObjectType for Post {
mod_, mod_,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;
@ -504,7 +504,7 @@ impl ApubLikeableType for Post {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
like.into_any_base()?, like,
context, context,
) )
.await?; .await?;
@ -531,7 +531,7 @@ impl ApubLikeableType for Post {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
dislike.into_any_base()?, dislike,
context, context,
) )
.await?; .await?;
@ -570,7 +570,7 @@ impl ApubLikeableType for Post {
&creator, &creator,
&community, &community,
vec![community.get_shared_inbox_url()?], vec![community.get_shared_inbox_url()?],
undo.into_any_base()?, undo,
context, context,
) )
.await?; .await?;

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
apub::{ apub::{
activities::generate_activity_id, activities::generate_activity_id,
activity_sender::SendUserActivity, activity_sender::send_activity,
check_actor_domain, check_actor_domain,
check_is_apub_id_valid, check_is_apub_id_valid,
create_tombstone, create_tombstone,
@ -135,12 +135,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, create.clone(), true, context.pool()).await?; insert_activity(creator.id, create.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), create, creator, vec![to])?;
activity: create.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -160,12 +155,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, update.clone(), true, context.pool()).await?; insert_activity(creator.id, update.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), update, creator, vec![to])?;
activity: update.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -184,12 +174,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, delete.clone(), true, context.pool()).await?; insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), delete, creator, vec![to])?;
activity: delete.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -219,12 +204,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, undo.clone(), true, context.pool()).await?; insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), undo, creator, vec![to])?;
activity: undo.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }

View file

@ -2,7 +2,7 @@ use crate::{
api::{check_slurs, check_slurs_opt}, api::{check_slurs, check_slurs_opt},
apub::{ apub::{
activities::generate_activity_id, activities::generate_activity_id,
activity_sender::SendUserActivity, activity_sender::send_activity,
check_actor_domain, check_actor_domain,
create_apub_response, create_apub_response,
fetcher::get_or_fetch_and_upsert_actor, fetcher::get_or_fetch_and_upsert_actor,
@ -128,12 +128,7 @@ impl ActorType for User_ {
insert_activity(self.id, follow.clone(), true, context.pool()).await?; insert_activity(self.id, follow.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), follow, self, vec![to])?;
activity: follow.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }
@ -158,12 +153,7 @@ impl ActorType for User_ {
insert_activity(self.id, undo.clone(), true, context.pool()).await?; insert_activity(self.id, undo.clone(), true, context.pool()).await?;
let message = SendUserActivity { send_activity(context.activity_sender(), undo, self, vec![to])?;
activity: undo.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().do_send(message);
Ok(()) Ok(())
} }