background-jobs 0.11

This commit is contained in:
Aode (lion) 2021-11-22 19:26:55 -06:00
parent 3e062a9959
commit c67dbdc2da
15 changed files with 51 additions and 38 deletions

19
Cargo.lock generated
View file

@ -423,9 +423,9 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.9.1" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb7df0fd6abf9d55139d4c9e569c0a8cd271ec265862c41bd215b46b36c52397" checksum = "77f4508c6c5b5cfc6c18d43d0ba6ecda339710206854da9e1c9ac9dfb7e3eb6f"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
@ -433,9 +433,9 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.9.6" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38aebb545b0fac45046421993890eb49cc04896a93b85bbfb1b9017decc413f9" checksum = "5dabf6a2204fe034db7910a38f8e2d183fe24eb92abd4c0aaca59f8cacf4e48b"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
@ -443,31 +443,32 @@ dependencies = [
"async-trait", "async-trait",
"background-jobs-core", "background-jobs-core",
"chrono", "chrono",
"log",
"num_cpus", "num_cpus",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing",
"tracing-futures",
"uuid", "uuid",
] ]
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.9.5" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee8604ff89c62ca8eefc1ea2c3f359a53b7930e640fb22bf7890eab13b4640d2" checksum = "174d36b170699ecc13b7513bda9eff6f12cc889eae5d16b792daa3f7b21be452"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
"async-mutex", "async-mutex",
"async-trait", "async-trait",
"chrono", "chrono",
"log",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tracing",
"tracing-futures",
"uuid", "uuid",
] ]

View file

@ -49,5 +49,5 @@ async-trait = "0.1.51"
captcha = "0.0.8" captcha = "0.0.8"
anyhow = "1.0.44" anyhow = "1.0.44"
thiserror = "1.0.29" thiserror = "1.0.29"
background-jobs = "0.9.1" background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] } reqwest = { version = "0.11.4", features = ["json"] }

View file

@ -43,6 +43,6 @@ sha2 = "0.9.8"
async-trait = "0.1.51" async-trait = "0.1.51"
anyhow = "1.0.44" anyhow = "1.0.44"
thiserror = "1.0.29" thiserror = "1.0.29"
background-jobs = "0.9.1" background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] } reqwest = { version = "0.11.4", features = ["json"] }
webmention = "0.4.0" webmention = "0.4.0"

View file

@ -47,7 +47,7 @@ sha2 = "0.9.8"
async-trait = "0.1.51" async-trait = "0.1.51"
anyhow = "1.0.44" anyhow = "1.0.44"
thiserror = "1.0.29" thiserror = "1.0.29"
background-jobs = "0.9.1" background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] } reqwest = { version = "0.11.4", features = ["json"] }
html2md = "0.2.13" html2md = "0.2.13"
once_cell = "1.8.0" once_cell = "1.8.0"

View file

@ -136,6 +136,7 @@ mod tests {
person::tests::parse_lemmy_person, person::tests::parse_lemmy_person,
tests::{file_to_json_object, init_context}, tests::{file_to_json_object, init_context},
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{ use lemmy_db_schema::{
source::{ source::{
community::Community, community::Community,
@ -148,7 +149,8 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_community_moderators() { async fn test_parse_lemmy_community_moderators() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;
let community_id = community.id; let community_id = community.id;

View file

@ -214,6 +214,7 @@ pub(crate) mod tests {
tests::{file_to_json_object, init_context}, tests::{file_to_json_object, init_context},
}; };
use assert_json_diff::assert_json_include; use assert_json_diff::assert_json_include;
use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial; use serial_test::serial;
async fn prepare_comment_test( async fn prepare_comment_test(
@ -241,7 +242,8 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
pub(crate) async fn test_parse_lemmy_comment() { pub(crate) async fn test_parse_lemmy_comment() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
@ -270,7 +272,8 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_comment() { async fn test_parse_pleroma_comment() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;

View file

@ -214,6 +214,7 @@ impl ApubCommunity {
pub(crate) mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::objects::tests::{file_to_json_object, init_context}; use crate::objects::tests::{file_to_json_object, init_context};
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud; use lemmy_db_schema::traits::Crud;
use serial_test::serial; use serial_test::serial;
@ -240,7 +241,8 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_community() { async fn test_parse_lemmy_community() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;
assert_eq!(community.title, "Ten Forward"); assert_eq!(community.title, "Ten Forward");

View file

@ -21,11 +21,11 @@ pub(crate) fn get_summary_from_string_or_source(
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use actix::Actor; use actix::Actor;
use background_jobs::QueueHandle;
use diesel::{ use diesel::{
r2d2::{ConnectionManager, Pool}, r2d2::{ConnectionManager, Pool},
PgConnection, PgConnection,
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{ use lemmy_db_schema::{
establish_unpooled_connection, establish_unpooled_connection,
get_database_url_from_env, get_database_url_from_env,
@ -45,7 +45,7 @@ pub(crate) mod tests {
// TODO: would be nice if we didnt have to use a full context for tests. // TODO: would be nice if we didnt have to use a full context for tests.
// or at least write a helper function so this code is shared with main.rs // or at least write a helper function so this code is shared with main.rs
pub(crate) fn init_context() -> LemmyContext { pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
// call this to run migrations // call this to run migrations
establish_unpooled_connection(); establish_unpooled_connection();
let settings = Settings::init().unwrap(); let settings = Settings::init().unwrap();
@ -57,7 +57,6 @@ pub(crate) mod tests {
.user_agent(build_user_agent(&settings)) .user_agent(build_user_agent(&settings))
.build() .build()
.unwrap(); .unwrap();
let activity_queue = create_activity_queue();
let secret = Secret { let secret = Secret {
id: 0, id: 0,
jwt_secret: "".to_string(), jwt_secret: "".to_string(),

View file

@ -198,6 +198,7 @@ impl ActorType for ApubPerson {
pub(crate) mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::objects::tests::{file_to_json_object, init_context}; use crate::objects::tests::{file_to_json_object, init_context};
use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud; use lemmy_db_schema::traits::Crud;
use serial_test::serial; use serial_test::serial;
@ -218,7 +219,8 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_person() { async fn test_parse_lemmy_person() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let person = parse_lemmy_person(&context).await; let person = parse_lemmy_person(&context).await;
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string())); assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
@ -231,7 +233,8 @@ pub(crate) mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_person() { async fn test_parse_pleroma_person() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let json = file_to_json_object("assets/pleroma/objects/person.json"); let json = file_to_json_object("assets/pleroma/objects/person.json");
let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap(); let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();
let mut request_counter = 0; let mut request_counter = 0;

View file

@ -205,12 +205,14 @@ mod tests {
post::ApubPost, post::ApubPost,
tests::{file_to_json_object, init_context}, tests::{file_to_json_object, init_context},
}; };
use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial; use serial_test::serial;
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_post() { async fn test_parse_lemmy_post() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await; let community = parse_lemmy_community(&context).await;
let person = parse_lemmy_person(&context).await; let person = parse_lemmy_person(&context).await;

View file

@ -162,6 +162,7 @@ mod tests {
tests::{file_to_json_object, init_context}, tests::{file_to_json_object, init_context},
}; };
use assert_json_diff::assert_json_include; use assert_json_diff::assert_json_include;
use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial; use serial_test::serial;
async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) { async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) {
@ -191,7 +192,8 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_lemmy_pm() { async fn test_parse_lemmy_pm() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json"); let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json");
@ -218,7 +220,8 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
#[serial] #[serial]
async fn test_parse_pleroma_pm() { async fn test_parse_pleroma_pm() {
let context = init_context(); let manager = create_activity_queue();
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await; let data = prepare_comment_test(&url, &context).await;
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap(); let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();

View file

@ -26,5 +26,5 @@ sha2 = "0.9.8"
actix-web = { version = "4.0.0-beta.9", default-features = false } actix-web = { version = "4.0.0-beta.9", default-features = false }
http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] } http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] } http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
background-jobs = "0.9.1" background-jobs = "0.11.0"
diesel = "1.4.8" diesel = "1.4.8"

View file

@ -1,10 +1,10 @@
use crate::{signatures::sign_and_send, traits::ActorType}; use crate::{signatures::sign_and_send, traits::ActorType};
use anyhow::{anyhow, Context, Error}; use anyhow::{anyhow, Context, Error};
use background_jobs::{ use background_jobs::{
create_server,
memory_storage::Storage, memory_storage::Storage,
ActixJob, ActixJob,
Backoff, Backoff,
Manager,
MaxRetries, MaxRetries,
QueueHandle, QueueHandle,
WorkerConfig, WorkerConfig,
@ -35,7 +35,7 @@ pub async fn send_activity(
if env::var("APUB_TESTING_SEND_SYNC").is_ok() { if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
do_send(message, client).await?; do_send(message, client).await?;
} else { } else {
activity_queue.queue::<SendActivityTask>(message)?; activity_queue.queue::<SendActivityTask>(message).await?;
} }
} }
@ -101,19 +101,13 @@ async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
Ok(()) Ok(())
} }
pub fn create_activity_queue() -> QueueHandle { pub fn create_activity_queue() -> Manager {
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(Storage::new());
let arbiter = actix_web::rt::Arbiter::new();
// Configure and start our workers // Configure and start our workers
WorkerConfig::new(|| MyState { WorkerConfig::new_managed(Storage::new(), |_| MyState {
client: Client::default(), client: Client::default(),
}) })
.register::<SendActivityTask>() .register::<SendActivityTask>()
.start_in_arbiter(&arbiter, queue_handle.clone()); .start()
queue_handle
} }
#[derive(Clone)] #[derive(Clone)]

View file

@ -26,7 +26,7 @@ serde_json = { version = "1.0.68", features = ["preserve_order"] }
actix = "0.12.0" actix = "0.12.0"
anyhow = "1.0.44" anyhow = "1.0.44"
diesel = "1.4.8" diesel = "1.4.8"
background-jobs = "0.9.1" background-jobs = "0.11.0"
tokio = "1.12.0" tokio = "1.12.0"
strum = "0.21.0" strum = "0.21.0"
strum_macros = "0.21.1" strum_macros = "0.21.1"

View file

@ -87,7 +87,9 @@ async fn main() -> Result<(), LemmyError> {
.user_agent(build_user_agent(&settings)) .user_agent(build_user_agent(&settings))
.build()?; .build()?;
let activity_queue = create_activity_queue(); let queue_manager = create_activity_queue();
let activity_queue = queue_manager.queue_handle().clone();
let chat_server = ChatServer::startup( let chat_server = ChatServer::startup(
pool.clone(), pool.clone(),
@ -128,5 +130,7 @@ async fn main() -> Result<(), LemmyError> {
.run() .run()
.await?; .await?;
drop(queue_manager);
Ok(()) Ok(())
} }