implement ActivitySender actor #89

Merged
dessalines merged 28 commits from activity-sender into main 2020-08-31 13:48:03 +00:00
10 changed files with 218 additions and 84 deletions
Showing only changes of commit 35649032c0 - Show all commits

View File

@ -1,20 +1,11 @@
use crate::{
apub::{
check_is_apub_id_valid,
community::do_announce,
extensions::signatures::sign,
insert_activity,
ActorType,
},
request::retry_custom,
apub::{activity_sender::SendUserActivity, community::do_announce, insert_activity},
LemmyContext,
LemmyError,
};
use activitystreams::base::AnyBase;
use actix_web::client::Client;
use lemmy_db::{community::Community, user::User_};
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
use log::debug;
use url::{ParseError, Url};
use uuid::Uuid;
@ -31,42 +22,12 @@ pub async fn send_activity_to_community(
if community.local {
do_announce(activity, &community, creator, context).await?;
} else {
send_activity(context.client(), &activity, creator, to).await?;
}
Ok(())
}
/// Send an activity to a list of recipients, using the correct headers etc.
pub async fn send_activity(
client: &Client,
activity: &AnyBase,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError> {
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = serde_json::to_string(&activity)?;
debug!("Sending activitypub activity {} to {:?}", activity, to);
for to_url in to {
check_is_apub_id_valid(&to_url)?;
let res = retry_custom(|| async {
let request = client
.post(to_url.as_str())
.header("Content-Type", "application/json");
match sign(request, actor, activity.clone()).await {
Ok(signed) => Ok(signed.send().await),
Err(e) => Err(e),
}
})
.await?;
debug!("Result for activity send: {:?}", res);
let message = SendUserActivity {
activity,
actor: creator.to_owned(),
to,
};
context.activity_sender().send(message).await??;

send is only when you want the result, do_send is when you don't want to wait, which is what we want for these.

https://actix.rs/book/actix/sec-3-address.html#message

`send` is only when you want the result, `do_send` is when you don't want to wait, which is what we want for these. https://actix.rs/book/actix/sec-3-address.html#message

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
}
Ok(())

View File

@ -0,0 +1,95 @@
use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError,
};
use activitystreams::base::AnyBase;
use actix::prelude::*;
use awc::Client;
use lemmy_db::{community::Community, user::User_};
use lemmy_utils::settings::Settings;
use log::debug;
use url::Url;
// We cant use ActorType here, because it doesnt implement Sized
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
nutomic marked this conversation as resolved Outdated

This can just be #[rtype(result = "()")], because we're not gonna wait for the result.

This can just be `#[rtype(result = "()")]`, because we're not gonna wait for the result.
pub struct SendUserActivity {
pub activity: AnyBase,
pub actor: User_,
pub to: Vec<Url>,
}
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
nutomic marked this conversation as resolved Outdated

Same.

Same.
pub struct SendCommunityActivity {
pub activity: AnyBase,
pub actor: Community,
pub to: Vec<Url>,
}
pub struct ActivitySender {
client: Client,
}
impl ActivitySender {
pub fn startup(client: Client) -> ActivitySender {
ActivitySender { client }
}
}
impl Actor for ActivitySender {
type Context = Context<Self>;
}
impl Handler<SendUserActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
fn handle(&mut self, msg: SendUserActivity, _ctx: &mut Context<Self>) -> Self::Result {
send_activity(msg.activity, &msg.actor, msg.to, &self.client)
}
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
nutomic marked this conversation as resolved Outdated

type Result = ();

type Result = ();
fn handle(&mut self, msg: SendCommunityActivity, _ctx: &mut Context<Self>) -> Self::Result {
send_activity(msg.activity, &msg.actor, msg.to, &self.client)
}
}
fn send_activity(
activity: AnyBase,
actor: &dyn ActorType,
to: Vec<Url>,
client: &Client,
) -> Result<(), LemmyError> {
if !Settings::get().federation.enabled {
return Ok(());
}
let serialised_activity = serde_json::to_string(&activity)?;
debug!(
"Sending activitypub activity {} to {:?}",
&serialised_activity, &to
);
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
let request = client
.post(to_url.as_str())
.header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone();
Box::pin(async move {

Hrm, not sure why this is required.

Hrm, not sure why this is required.

Because I'm calling some async functions in there and need to await them.

Because I'm calling some async functions in there and need to await them.
// TODO: need to remove the unwrap, but ? causes compile errors
// TODO: if the sending fails, it should retry with exponential backoff
let signed = sign(request, actor, serialised_activity).await.unwrap();
let res = signed.send().await;
debug!("Result for activity send: {:?}", res);
Ok::<(), LemmyError>(())
});
}
Ok(())
}

View File

@ -1,7 +1,8 @@
use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_sender::{SendCommunityActivity, SendUserActivity},
check_actor_domain,
create_apub_response,
create_apub_tombstone_response,
@ -155,7 +156,12 @@ impl ActorType for Community {
insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
send_activity(context.client(), &accept.into_any_base()?, self, vec![to]).await?;
let message = SendCommunityActivity {
activity: accept.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -176,7 +182,12 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &delete.into_any_base()?, creator, inboxes).await?;
let message = SendUserActivity {
activity: delete.into_any_base()?,
actor: creator.to_owned(),
to: inboxes,
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -208,7 +219,12 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &undo.into_any_base()?, creator, inboxes).await?;
let message = SendUserActivity {
activity: undo.into_any_base()?,
actor: creator.to_owned(),
to: inboxes,
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -229,7 +245,12 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &remove.into_any_base()?, mod_, inboxes).await?;
let message = SendUserActivity {
activity: remove.into_any_base()?,
actor: mod_.to_owned(),
to: inboxes,
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -258,7 +279,12 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for remove , the creator is the actor, and does the signing
send_activity(context.client(), &undo.into_any_base()?, mod_, inboxes).await?;
let message = SendUserActivity {
activity: undo.into_any_base()?,
actor: mod_.to_owned(),
to: inboxes,
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -511,7 +537,12 @@ pub async fn do_announce(
let community_shared_inbox = community.get_shared_inbox_url()?;
to.retain(|x| x != &community_shared_inbox);
send_activity(context.client(), &announce.into_any_base()?, community, to).await?;
let message = SendCommunityActivity {
activity: announce.into_any_base()?,
actor: community.to_owned(),
to,
};
context.activity_sender().send(message).await??;
Ok(())
}

View File

@ -1,3 +1,4 @@
pub mod activity_sender;
pub mod activities;
pub mod comment;
pub mod community;

View File

@ -1,6 +1,7 @@
use crate::{
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_sender::SendUserActivity,
check_actor_domain,
check_is_apub_id_valid,
create_tombstone,
@ -134,13 +135,12 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, create.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&create.into_any_base()?,
creator,
vec![to],
)
.await?;
let message = SendUserActivity {
activity: create.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -160,13 +160,12 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, update.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&update.into_any_base()?,
creator,
vec![to],
)
.await?;
let message = SendUserActivity {
activity: update.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -185,13 +184,12 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&delete.into_any_base()?,
creator,
vec![to],
)
.await?;
let message = SendUserActivity {
activity: delete.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -221,7 +219,12 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
send_activity(context.client(), &undo.into_any_base()?, creator, vec![to]).await?;
let message = SendUserActivity {
activity: undo.into_any_base()?,
actor: creator.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}

View File

@ -1,7 +1,8 @@
use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_sender::SendUserActivity,
check_actor_domain,
create_apub_response,
fetcher::get_or_fetch_and_upsert_actor,
@ -127,7 +128,12 @@ impl ActorType for User_ {
insert_activity(self.id, follow.clone(), true, context.pool()).await?;
send_activity(context.client(), &follow.into_any_base()?, self, vec![to]).await?;
let message = SendUserActivity {
activity: follow.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}
@ -152,7 +158,12 @@ impl ActorType for User_ {
insert_activity(self.id, undo.clone(), true, context.pool()).await?;
send_activity(context.client(), &undo.into_any_base()?, self, vec![to]).await?;
let message = SendUserActivity {
activity: undo.into_any_base()?,
actor: self.to_owned(),
to: vec![to],
};
context.activity_sender().send(message).await??;
Ok(())
}

View File

@ -33,6 +33,8 @@ use crate::{
request::{retry, RecvError},
websocket::server::ChatServer,
};
use crate::apub::activity_sender::ActivitySender;
use actix::Addr;
use actix_web::{client::Client, dev::ConnectionInfo};
use anyhow::anyhow;
@ -75,14 +77,21 @@ pub struct LemmyContext {
pub pool: DbPool,
pub chat_server: Addr<ChatServer>,
pub client: Client,
pub activity_sender: Addr<ActivitySender>,
}
impl LemmyContext {
pub fn create(pool: DbPool, chat_server: Addr<ChatServer>, client: Client) -> LemmyContext {
pub fn create(
pool: DbPool,
chat_server: Addr<ChatServer>,
client: Client,
activity_sender: Addr<ActivitySender>,
) -> LemmyContext {
LemmyContext {
pool,
chat_server,
client,
activity_sender,
}
}
pub fn pool(&self) -> &DbPool {
@ -94,6 +103,9 @@ impl LemmyContext {
pub fn client(&self) -> &Client {
&self.client
}
pub fn activity_sender(&self) -> &Addr<ActivitySender> {
&self.activity_sender
}
}
impl Clone for LemmyContext {
@ -102,6 +114,7 @@ impl Clone for LemmyContext {
pool: self.pool.clone(),
chat_server: self.chat_server.clone(),
client: self.client.clone(),
activity_sender: self.activity_sender.clone(),
}
}
}

View File

@ -20,6 +20,7 @@ use diesel::{
};
use lemmy_db::get_database_url_from_env;
use lemmy_server::{
apub::activity_sender::ActivitySender,
blocking,
code_migrations::run_advanced_migrations,
rate_limit::{rate_limiter::RateLimiter, RateLimit},
@ -76,9 +77,20 @@ async fn main() -> Result<(), LemmyError> {
// Create Http server with websocket support
HttpServer::new(move || {
let chat_server =
ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start();
let context = LemmyContext::create(pool.clone(), chat_server, Client::default());
let activity_sender = ActivitySender::startup(Client::default()).start();
let chat_server = ChatServer::startup(
pool.clone(),
rate_limiter.clone(),
Client::default(),
activity_sender.clone(),
)
.start();
let context = LemmyContext::create(
pool.clone(),
chat_server,
Client::default(),
activity_sender,
);
let settings = Settings::get();
let rate_limiter = rate_limiter.clone();
App::new()

View File

@ -19,7 +19,7 @@ where
retry_custom(|| async { Ok((f)().await) }).await
}
pub async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<Result<T, actix_web::client::SendRequestError>, LemmyError>>,

View File

@ -5,6 +5,7 @@
use super::*;
use crate::{
api::{comment::*, community::*, post::*, site::*, user::*, *},
apub::activity_sender::ActivitySender,
rate_limit::RateLimit,
websocket::UserOperation,
CommunityId,
@ -181,6 +182,8 @@ pub struct ChatServer {
/// An HTTP Client
client: Client,
activity_sender: Addr<ActivitySender>,
}
impl ChatServer {
@ -188,6 +191,7 @@ impl ChatServer {
pool: Pool<ConnectionManager<PgConnection>>,
rate_limiter: RateLimit,
client: Client,
activity_sender: Addr<ActivitySender>,
) -> ChatServer {
ChatServer {
sessions: HashMap::new(),
@ -199,6 +203,7 @@ impl ChatServer {
rate_limiter,
captchas: Vec::new(),
client,
activity_sender,
}
}
@ -455,6 +460,7 @@ impl ChatServer {
};
let client = self.client.clone();
let activity_sender = self.activity_sender.clone();
async move {
let msg = msg;
let json: Value = serde_json::from_str(&msg.msg)?;
@ -469,6 +475,7 @@ impl ChatServer {
pool,
chat_server: addr,
client,
activity_sender,
};
let args = Args {
context: &context,