attempt to use background-jobs crate

This commit is contained in:
Felix Ableitner 2020-08-20 18:52:48 +02:00
parent f7f64363ea
commit 1b8eb9b970
12 changed files with 207 additions and 157 deletions

51
server/Cargo.lock generated vendored
View file

@ -490,6 +490,56 @@ dependencies = [
"serde_urlencoded",
]
[[package]]
name = "background-jobs"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7"
dependencies = [
"background-jobs-actix",
"background-jobs-core",
]
[[package]]
name = "background-jobs-actix"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"chrono",
"log",
"num_cpus",
"rand 0.7.3",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "background-jobs-core"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"chrono",
"futures",
"log",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "backtrace"
version = "0.3.50"
@ -1723,6 +1773,7 @@ dependencies = [
"anyhow",
"async-trait",
"awc",
"background-jobs",
"base64 0.12.3",
"bcrypt",
"captcha",

1
server/Cargo.toml vendored
View file

@ -53,3 +53,4 @@ async-trait = "0.1.36"
captcha = "0.0.7"
anyhow = "1.0.32"
thiserror = "1.0.20"
background-jobs = " 0.8.0-alpha.2"

View file

@ -1,5 +1,5 @@
use crate::{
apub::{activity_sender::send_activity, community::do_announce, insert_activity},
apub::{activity_queue::send_activity, community::do_announce, insert_activity},
LemmyContext,
LemmyError,
};
@ -32,7 +32,7 @@ where
if community.local {
do_announce(activity.into_any_base()?, &community, creator, context).await?;
} else {
send_activity(context.activity_sender(), activity, creator, to)?;
send_activity(context.activity_queue(), activity, creator, to)?;
}
Ok(())

View file

@ -0,0 +1,120 @@
use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError,
};
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use actix::prelude::*;
use anyhow::Context;
use awc::Client;
use lemmy_utils::{location_info, settings::Settings};
use log::{debug, warn};
use serde::Serialize;
use url::Url;
use background_jobs::{Backoff, MaxRetries, WorkerConfig, QueueHandle, Job, create_server};
use background_jobs::memory_storage::Storage;
use serde::Deserialize;
use anyhow::Error;
use futures::future::{Ready, ok};
use std::pin::Pin;
use std::future::Future;
pub fn send_activity<T, Kind>(
activity_sender: &QueueHandle,
activity: T,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind>,
T: Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = activity.into_any_base()?;
let serialised_activity = serde_json::to_string(&activity)?;
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
}
// TODO: it would make sense to create a separate task for each destination server
let message = SendActivityTask {
activity: serialised_activity,
to,
actor_id: actor.actor_id()?,
private_key: actor.private_key().context(location_info!())?,
};
activity_sender.queue::<SendActivityTask>(message)?;
Ok(())
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SendActivityTask {
activity: String,
to: Vec<Url>,
actor_id: Url,
private_key: String,
}
impl Job for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "SendActivityTask";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(2);
fn run(self, _: Self::State) -> Self::Future {
Box::pin(async move {
for to_url in &self.to {
// TODO: should pass this in somehow instead of creating a new client every time
// i suppose this can be done through a state
let client = Client::default();
let request = client
.post(to_url.as_str())
.header("Content-Type", "application/json");
// TODO: i believe we have to do the signing in here because it is only valid for a few seconds
let signed = sign(
request,
self.activity.clone(),
&self.actor_id,
self.private_key.to_owned(),
)
.await?;
signed.send().await?;
}
Ok(())
})
}
}
pub fn create_activity_queue() -> QueueHandle {
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(Storage::new());
// Configure and start our workers
WorkerConfig::new(||{})
.register::<SendActivityTask>()
.start(queue_handle.clone());
// Queue our jobs
//queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
//queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
//queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
// Block on Actix
queue_handle
}

View file

@ -1,122 +0,0 @@
use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError,
};
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use actix::prelude::*;
use anyhow::Context;
use awc::Client;
use lemmy_utils::{location_info, settings::Settings};
use log::{debug, warn};
use serde::Serialize;
use url::Url;
pub fn send_activity<T, Kind>(
activity_sender: &Addr<ActivitySender>,
activity: T,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind>,
T: Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = activity.into_any_base()?;
let serialised_activity = serde_json::to_string(&activity)?;
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
}
let message = SendActivity {
activity: serialised_activity,
to,
actor_id: actor.actor_id()?,
private_key: actor.private_key().context(location_info!())?,
};
activity_sender.do_send(message);
Ok(())
}
#[derive(Message)]
#[rtype(result = "()")]
struct SendActivity {
activity: String,
to: Vec<Url>,
actor_id: Url,
private_key: String,
}
pub struct ActivitySender {
client: Client,
}
impl ActivitySender {
pub fn startup(client: Client) -> ActivitySender {
ActivitySender { client }
}
}
impl Actor for ActivitySender {
type Context = actix::Context<Self>;
}
impl Handler<SendActivity> for ActivitySender {
type Result = ();
fn handle(&mut self, msg: SendActivity, _ctx: &mut actix::Context<Self>) -> Self::Result {
debug!(
"Sending activitypub activity {} to {:?}",
&msg.activity, &msg.to
);
Box::pin(async move {
for to_url in &msg.to {
let request = self
.client
.post(to_url.as_str())
.header("Content-Type", "application/json");
let signed = sign(
request,
msg.activity.clone(),
&msg.actor_id,
msg.private_key.to_owned(),
)
.await;
let signed = match signed {
Ok(s) => s,
Err(e) => {
warn!(
"Failed to sign activity {} from {}: {}",
&msg.activity, &msg.actor_id, e
);
return;
}
};
// TODO: if the sending fails, it should retry with exponential backoff
match signed.send().await {
Ok(_) => {}
Err(e) => {
warn!(
"Failed to send activity {} to {}: {}",
&msg.activity, &to_url, e
);
}
}
}
});
}
}

View file

@ -2,7 +2,7 @@ use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::generate_activity_id,
activity_sender::send_activity,
activity_queue::send_activity,
check_actor_domain,
create_apub_response,
create_apub_tombstone_response,
@ -156,7 +156,7 @@ impl ActorType for Community {
insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), accept, self, vec![to])?;
send_activity(context.activity_queue(), accept, self, vec![to])?;
Ok(())
}
@ -177,7 +177,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.activity_sender(), delete, creator, inboxes)?;
send_activity(context.activity_queue(), delete, creator, inboxes)?;
Ok(())
}
@ -209,7 +209,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.activity_sender(), undo, creator, inboxes)?;
send_activity(context.activity_queue(), undo, creator, inboxes)?;
Ok(())
}
@ -230,7 +230,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.activity_sender(), remove, mod_, inboxes)?;
send_activity(context.activity_queue(), remove, mod_, inboxes)?;
Ok(())
}
@ -259,7 +259,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for remove , the creator is the actor, and does the signing
send_activity(context.activity_sender(), undo, mod_, inboxes)?;
send_activity(context.activity_queue(), undo, mod_, inboxes)?;
Ok(())
}
@ -512,7 +512,7 @@ pub async fn do_announce(
let community_shared_inbox = community.get_shared_inbox_url()?;
to.retain(|x| x != &community_shared_inbox);
send_activity(context.activity_sender(), announce, community, to)?;
send_activity(context.activity_queue(), announce, community, to)?;
Ok(())
}

View file

@ -1,5 +1,5 @@
pub mod activities;
pub mod activity_sender;
pub mod activity_queue;
pub mod comment;
pub mod community;
pub mod extensions;

View file

@ -1,7 +1,7 @@
use crate::{
apub::{
activities::generate_activity_id,
activity_sender::send_activity,
activity_queue::send_activity,
check_actor_domain,
check_is_apub_id_valid,
create_tombstone,
@ -135,7 +135,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, create.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), create, creator, vec![to])?;
send_activity(context.activity_queue(), create, creator, vec![to])?;
Ok(())
}
@ -155,7 +155,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, update.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), update, creator, vec![to])?;
send_activity(context.activity_queue(), update, creator, vec![to])?;
Ok(())
}
@ -174,7 +174,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), delete, creator, vec![to])?;
send_activity(context.activity_queue(), delete, creator, vec![to])?;
Ok(())
}
@ -204,7 +204,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), undo, creator, vec![to])?;
send_activity(context.activity_queue(), undo, creator, vec![to])?;
Ok(())
}

View file

@ -2,7 +2,7 @@ use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::generate_activity_id,
activity_sender::send_activity,
activity_queue::send_activity,
check_actor_domain,
create_apub_response,
fetcher::get_or_fetch_and_upsert_actor,
@ -128,7 +128,7 @@ impl ActorType for User_ {
insert_activity(self.id, follow.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), follow, self, vec![to])?;
send_activity(context.activity_queue(), follow, self, vec![to])?;
Ok(())
}
@ -153,7 +153,7 @@ impl ActorType for User_ {
insert_activity(self.id, undo.clone(), true, context.pool()).await?;
send_activity(context.activity_sender(), undo, self, vec![to])?;
send_activity(context.activity_queue(), undo, self, vec![to])?;
Ok(())
}

View file

@ -34,7 +34,6 @@ use crate::{
websocket::server::ChatServer,
};
use crate::apub::activity_sender::ActivitySender;
use actix::Addr;
use actix_web::{client::Client, dev::ConnectionInfo};
use anyhow::anyhow;
@ -43,6 +42,7 @@ use log::error;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use serde::Deserialize;
use std::process::Command;
use background_jobs::QueueHandle;
pub type DbPool = diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<diesel::PgConnection>>;
pub type ConnectionId = usize;
@ -77,7 +77,7 @@ pub struct LemmyContext {
pub pool: DbPool,
pub chat_server: Addr<ChatServer>,
pub client: Client,
pub activity_sender: Addr<ActivitySender>,
pub activity_queue: QueueHandle,
}
impl LemmyContext {
@ -85,13 +85,13 @@ impl LemmyContext {
pool: DbPool,
chat_server: Addr<ChatServer>,
client: Client,
activity_sender: Addr<ActivitySender>,
activity_queue: QueueHandle,
) -> LemmyContext {
LemmyContext {
pool,
chat_server,
client,
activity_sender,
activity_queue,
}
}
pub fn pool(&self) -> &DbPool {
@ -103,8 +103,8 @@ impl LemmyContext {
pub fn client(&self) -> &Client {
&self.client
}
pub fn activity_sender(&self) -> &Addr<ActivitySender> {
&self.activity_sender
pub fn activity_queue(&self) -> &QueueHandle {
&self.activity_queue
}
}
@ -114,7 +114,7 @@ impl Clone for LemmyContext {
pool: self.pool.clone(),
chat_server: self.chat_server.clone(),
client: self.client.clone(),
activity_sender: self.activity_sender.clone(),
activity_queue: self.activity_queue.clone(),
}
}
}

View file

@ -20,7 +20,6 @@ use diesel::{
};
use lemmy_db::get_database_url_from_env;
use lemmy_server::{
apub::activity_sender::ActivitySender,
blocking,
code_migrations::run_advanced_migrations,
rate_limit::{rate_limiter::RateLimiter, RateLimit},
@ -32,6 +31,7 @@ use lemmy_server::{
use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX};
use std::sync::Arc;
use tokio::sync::Mutex;
use lemmy_server::apub::activity_queue::create_activity_queue;
lazy_static! {
// static ref CACHE_CONTROL_VALUE: String = format!("public, max-age={}", 365 * 24 * 60 * 60);
@ -77,19 +77,19 @@ async fn main() -> Result<(), LemmyError> {
// Create Http server with websocket support
HttpServer::new(move || {
let activity_sender = ActivitySender::startup(Client::default()).start();
let activity_queue = create_activity_queue();
let chat_server = ChatServer::startup(
pool.clone(),
rate_limiter.clone(),
Client::default(),
activity_sender.clone(),
activity_queue.clone(),
)
.start();
let context = LemmyContext::create(
pool.clone(),
chat_server,
Client::default(),
activity_sender,
activity_queue
);
let settings = Settings::get();
let rate_limiter = rate_limiter.clone();

View file

@ -5,7 +5,6 @@
use super::*;
use crate::{
api::{comment::*, community::*, post::*, site::*, user::*, *},
apub::activity_sender::ActivitySender,
rate_limit::RateLimit,
websocket::UserOperation,
CommunityId,
@ -20,6 +19,7 @@ use actix_web::{client::Client, web};
use anyhow::Context as acontext;
use lemmy_db::naive_now;
use lemmy_utils::location_info;
use background_jobs::QueueHandle;
/// Chat server sends this messages to session
#[derive(Message)]
@ -183,7 +183,7 @@ pub struct ChatServer {
/// An HTTP Client
client: Client,
activity_sender: Addr<ActivitySender>,
activity_queue: QueueHandle,
}
impl ChatServer {
@ -191,7 +191,7 @@ impl ChatServer {
pool: Pool<ConnectionManager<PgConnection>>,
rate_limiter: RateLimit,
client: Client,
activity_sender: Addr<ActivitySender>,
activity_queue: QueueHandle,
) -> ChatServer {
ChatServer {
sessions: HashMap::new(),
@ -203,7 +203,7 @@ impl ChatServer {
rate_limiter,
captchas: Vec::new(),
client,
activity_sender,
activity_queue,
}
}
@ -460,7 +460,7 @@ impl ChatServer {
};
let client = self.client.clone();
let activity_sender = self.activity_sender.clone();
let activity_queue = self.activity_queue.clone();
async move {
let msg = msg;
let json: Value = serde_json::from_str(&msg.msg)?;
@ -475,7 +475,7 @@ impl ChatServer {
pool,
chat_server: addr,
client,
activity_sender,
activity_queue,
};
let args = Args {
context: &context,