reuse http client
This commit is contained in:
parent
74af8b8014
commit
71fd1e8508
4 changed files with 19 additions and 25 deletions
|
@ -67,21 +67,18 @@ struct SendActivityTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActixJob for SendActivityTask {
|
impl ActixJob for SendActivityTask {
|
||||||
type State = ();
|
type State = MyState;
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||||
const NAME: &'static str = "SendActivityTask";
|
const NAME: &'static str = "SendActivityTask";
|
||||||
|
|
||||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(2);
|
const BACKOFF: Backoff = Backoff::Exponential(2);
|
||||||
|
|
||||||
fn run(self, _: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
for to_url in &self.to {
|
for to_url in &self.to {
|
||||||
// TODO: should pass this in somehow instead of creating a new client every time
|
let request = state
|
||||||
// i suppose this can be done through a state
|
.client
|
||||||
let client = Client::default();
|
|
||||||
|
|
||||||
let request = client
|
|
||||||
.post(to_url.as_str())
|
.post(to_url.as_str())
|
||||||
.header("Content-Type", "application/json");
|
.header("Content-Type", "application/json");
|
||||||
|
|
||||||
|
@ -121,15 +118,16 @@ pub fn create_activity_queue() -> QueueHandle {
|
||||||
let queue_handle = create_server(Storage::new());
|
let queue_handle = create_server(Storage::new());
|
||||||
|
|
||||||
// Configure and start our workers
|
// Configure and start our workers
|
||||||
WorkerConfig::new(|| {})
|
WorkerConfig::new(|| MyState {
|
||||||
|
client: Client::default(),
|
||||||
|
})
|
||||||
.register::<SendActivityTask>()
|
.register::<SendActivityTask>()
|
||||||
.start(queue_handle.clone());
|
.start(queue_handle.clone());
|
||||||
|
|
||||||
// Queue our jobs
|
|
||||||
//queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
|
|
||||||
//queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
|
|
||||||
//queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
|
|
||||||
|
|
||||||
// Block on Actix
|
|
||||||
queue_handle
|
queue_handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct MyState {
|
||||||
|
pub client: Client,
|
||||||
|
}
|
||||||
|
|
|
@ -37,12 +37,12 @@ use crate::{
|
||||||
use actix::Addr;
|
use actix::Addr;
|
||||||
use actix_web::{client::Client, dev::ConnectionInfo};
|
use actix_web::{client::Client, dev::ConnectionInfo};
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
|
use background_jobs::QueueHandle;
|
||||||
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
|
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
|
||||||
use log::error;
|
use log::error;
|
||||||
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
|
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use background_jobs::QueueHandle;
|
|
||||||
|
|
||||||
pub type DbPool = diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<diesel::PgConnection>>;
|
pub type DbPool = diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<diesel::PgConnection>>;
|
||||||
pub type ConnectionId = usize;
|
pub type ConnectionId = usize;
|
||||||
|
|
|
@ -20,6 +20,7 @@ use diesel::{
|
||||||
};
|
};
|
||||||
use lemmy_db::get_database_url_from_env;
|
use lemmy_db::get_database_url_from_env;
|
||||||
use lemmy_server::{
|
use lemmy_server::{
|
||||||
|
apub::activity_queue::create_activity_queue,
|
||||||
blocking,
|
blocking,
|
||||||
code_migrations::run_advanced_migrations,
|
code_migrations::run_advanced_migrations,
|
||||||
rate_limit::{rate_limiter::RateLimiter, RateLimit},
|
rate_limit::{rate_limiter::RateLimiter, RateLimit},
|
||||||
|
@ -31,7 +32,6 @@ use lemmy_server::{
|
||||||
use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX};
|
use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use lemmy_server::apub::activity_queue::create_activity_queue;
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
// static ref CACHE_CONTROL_VALUE: String = format!("public, max-age={}", 365 * 24 * 60 * 60);
|
// static ref CACHE_CONTROL_VALUE: String = format!("public, max-age={}", 365 * 24 * 60 * 60);
|
||||||
|
@ -85,12 +85,8 @@ async fn main() -> Result<(), LemmyError> {
|
||||||
activity_queue.clone(),
|
activity_queue.clone(),
|
||||||
)
|
)
|
||||||
.start();
|
.start();
|
||||||
let context = LemmyContext::create(
|
let context =
|
||||||
pool.clone(),
|
LemmyContext::create(pool.clone(), chat_server, Client::default(), activity_queue);
|
||||||
chat_server,
|
|
||||||
Client::default(),
|
|
||||||
activity_queue
|
|
||||||
);
|
|
||||||
let settings = Settings::get();
|
let settings = Settings::get();
|
||||||
let rate_limiter = rate_limiter.clone();
|
let rate_limiter = rate_limiter.clone();
|
||||||
App::new()
|
App::new()
|
||||||
|
|
|
@ -17,9 +17,9 @@ use crate::{
|
||||||
};
|
};
|
||||||
use actix_web::{client::Client, web};
|
use actix_web::{client::Client, web};
|
||||||
use anyhow::Context as acontext;
|
use anyhow::Context as acontext;
|
||||||
|
use background_jobs::QueueHandle;
|
||||||
use lemmy_db::naive_now;
|
use lemmy_db::naive_now;
|
||||||
use lemmy_utils::location_info;
|
use lemmy_utils::location_info;
|
||||||
use background_jobs::QueueHandle;
|
|
||||||
|
|
||||||
/// Chat server sends this messages to session
|
/// Chat server sends this messages to session
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
|
|
Loading…
Reference in a new issue