diff --git a/server/src/apub/activity_queue.rs b/server/src/apub/activity_queue.rs index 864d9df51..bc5faaa39 100644 --- a/server/src/apub/activity_queue.rs +++ b/server/src/apub/activity_queue.rs @@ -67,21 +67,18 @@ struct SendActivityTask { } impl ActixJob for SendActivityTask { - type State = (); + type State = MyState; type Future = Pin>>>; const NAME: &'static str = "SendActivityTask"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(2); - fn run(self, _: Self::State) -> Self::Future { + fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { for to_url in &self.to { - // TODO: should pass this in somehow instead of creating a new client every time - // i suppose this can be done through a state - let client = Client::default(); - - let request = client + let request = state + .client .post(to_url.as_str()) .header("Content-Type", "application/json"); @@ -121,15 +118,16 @@ pub fn create_activity_queue() -> QueueHandle { let queue_handle = create_server(Storage::new()); // Configure and start our workers - WorkerConfig::new(|| {}) - .register::() - .start(queue_handle.clone()); + WorkerConfig::new(|| MyState { + client: Client::default(), + }) + .register::() + .start(queue_handle.clone()); - // Queue our jobs - //queue_handle.queue::(MyJob::new(1, 2))?; - //queue_handle.queue::(MyJob::new(3, 4))?; - //queue_handle.queue::(MyJob::new(5, 6))?; - - // Block on Actix queue_handle } + +#[derive(Clone)] +struct MyState { + pub client: Client, +} diff --git a/server/src/lib.rs b/server/src/lib.rs index e0e641c90..12ee5c46c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -37,12 +37,12 @@ use crate::{ use actix::Addr; use actix_web::{client::Client, dev::ConnectionInfo}; use anyhow::anyhow; +use background_jobs::QueueHandle; use lemmy_utils::{get_apub_protocol_string, settings::Settings}; use log::error; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use serde::Deserialize; use std::process::Command; -use background_jobs::QueueHandle; pub type DbPool = diesel::r2d2::Pool>; pub type ConnectionId = usize; diff --git a/server/src/main.rs b/server/src/main.rs index 3e77fb61f..c8cea78a1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,6 +20,7 @@ use diesel::{ }; use lemmy_db::get_database_url_from_env; use lemmy_server::{ + apub::activity_queue::create_activity_queue, blocking, code_migrations::run_advanced_migrations, rate_limit::{rate_limiter::RateLimiter, RateLimit}, @@ -31,7 +32,6 @@ use lemmy_server::{ use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX}; use std::sync::Arc; use tokio::sync::Mutex; -use lemmy_server::apub::activity_queue::create_activity_queue; lazy_static! { // static ref CACHE_CONTROL_VALUE: String = format!("public, max-age={}", 365 * 24 * 60 * 60); @@ -85,12 +85,8 @@ async fn main() -> Result<(), LemmyError> { activity_queue.clone(), ) .start(); - let context = LemmyContext::create( - pool.clone(), - chat_server, - Client::default(), - activity_queue - ); + let context = + LemmyContext::create(pool.clone(), chat_server, Client::default(), activity_queue); let settings = Settings::get(); let rate_limiter = rate_limiter.clone(); App::new() diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index a20681fdf..c76b248ef 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -17,9 +17,9 @@ use crate::{ }; use actix_web::{client::Client, web}; use anyhow::Context as acontext; +use background_jobs::QueueHandle; use lemmy_db::naive_now; use lemmy_utils::location_info; -use background_jobs::QueueHandle; /// Chat server sends this messages to session #[derive(Message)]