diff --git a/server/Cargo.lock b/server/Cargo.lock index 9781adaf12..815ebc0a81 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -490,6 +490,56 @@ dependencies = [ "serde_urlencoded", ] +[[package]] +name = "background-jobs" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7" +dependencies = [ + "background-jobs-actix", + "background-jobs-core", +] + +[[package]] +name = "background-jobs-actix" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f" +dependencies = [ + "actix-rt", + "anyhow", + "async-trait", + "background-jobs-core", + "chrono", + "log", + "num_cpus", + "rand 0.7.3", + "serde 1.0.114", + "serde_json", + "thiserror", + "tokio", + "uuid 0.8.1", +] + +[[package]] +name = "background-jobs-core" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7" +dependencies = [ + "actix-rt", + "anyhow", + "async-trait", + "chrono", + "futures", + "log", + "serde 1.0.114", + "serde_json", + "thiserror", + "tokio", + "uuid 0.8.1", +] + [[package]] name = "backtrace" version = "0.3.50" @@ -1501,6 +1551,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" +dependencies = [ + "bytes", + "http", +] + [[package]] name = "http-signature-normalization" version = "0.5.2" @@ -1544,6 +1604,43 @@ dependencies = [ "quick-error", ] +[[package]] +name = "hyper" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "itoa", + "pin-project", + "socket2", + "time 0.1.43", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1618,9 +1715,15 @@ dependencies = [ "socket2", "widestring", "winapi 0.3.9", - "winreg", + "winreg 0.6.2", ] +[[package]] +name = "ipnet" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" + [[package]] name = "itertools" version = "0.9.0" @@ -1723,6 +1826,7 @@ dependencies = [ "anyhow", "async-trait", "awc", + "background-jobs", "base64 0.12.3", "bcrypt", "captcha", @@ -1743,6 +1847,7 @@ dependencies = [ "openssl", "percent-encoding", "rand 0.7.3", + "reqwest", "rss", "serde 1.0.114", "serde_json", @@ -2669,6 +2774,42 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "reqwest" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" +dependencies = [ + "base64 0.12.3", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "mime_guess", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde 1.0.114", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.7.0", +] + [[package]] name = "resolv-conf" version = "0.6.3" @@ -3261,6 +3402,16 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.2.0" @@ -3290,6 +3441,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower-service" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" + [[package]] name = "tracing" version = "0.1.18" @@ -3350,6 +3507,12 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "twoway" version = "0.2.1" @@ -3528,6 +3691,16 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -3541,6 +3714,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0563a9a4b071746dd5aedbc3a28c6fe9be4586fb3fbadb67c400d4f53c6b16c" dependencies = [ "cfg-if", + "serde 1.0.114", + "serde_json", "wasm-bindgen-macro", ] @@ -3559,6 +3734,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95f8d235a77f880bcef268d379810ea6c0af2eacfa90b1ad5af731776e0c4699" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.67" @@ -3675,6 +3862,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winutil" version = "0.1.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index dba0fee6fe..c5bf9c888c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -53,3 +53,5 @@ async-trait = "0.1.36" captcha = "0.0.7" anyhow = "1.0.32" thiserror = "1.0.20" +background-jobs = " 0.8.0-alpha.2" +reqwest = { version = "0.10", features = ["json"] } diff --git a/server/lemmy_db/src/activity.rs b/server/lemmy_db/src/activity.rs index 177e6b7cd6..c28eda45c0 100644 --- a/server/lemmy_db/src/activity.rs +++ b/server/lemmy_db/src/activity.rs @@ -113,7 +113,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_862362".into(), + actor_id: None, bio: None, local: true, private_key: None, diff --git a/server/lemmy_db/src/comment.rs b/server/lemmy_db/src/comment.rs index b2e22aa625..c462db1bb4 100644 --- a/server/lemmy_db/src/comment.rs +++ b/server/lemmy_db/src/comment.rs @@ -39,13 +39,13 @@ pub struct CommentForm { pub published: Option, pub updated: Option, pub deleted: Option, - pub ap_id: String, + pub ap_id: Option, pub local: bool, } impl CommentForm { pub fn get_ap_id(&self) -> Result { - Url::parse(&self.ap_id) + Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string())) } } @@ -163,12 +163,13 @@ impl Comment { } pub fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result { - let existing = Self::read_from_apub_id(conn, &comment_form.ap_id); - match existing { - Err(NotFound {}) => Ok(Self::create(conn, &comment_form)?), - Ok(p) => Ok(Self::update(conn, p.id, &comment_form)?), - Err(e) => Err(e), - } + use crate::schema::comment::dsl::*; + insert_into(comment) + .values(comment_form) + .on_conflict(ap_id) + .do_update() + .set(comment_form) + .get_result::(conn) } } @@ -272,7 +273,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_283687".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -292,7 +293,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_928738972".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -320,7 +321,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -337,7 +338,7 @@ mod tests { parent_id: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; @@ -354,7 +355,7 @@ mod tests { parent_id: None, published: inserted_comment.published, updated: None, - ap_id: "http://fake.com".into(), + ap_id: inserted_comment.ap_id.to_owned(), local: true, }; @@ -368,7 +369,7 @@ mod tests { read: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; diff --git a/server/lemmy_db/src/comment_view.rs b/server/lemmy_db/src/comment_view.rs index 5b14013770..1dcdf1934a 100644 --- a/server/lemmy_db/src/comment_view.rs +++ b/server/lemmy_db/src/comment_view.rs @@ -517,7 +517,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_92873982".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -537,7 +537,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_7625376".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -565,7 +565,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -582,7 +582,7 @@ mod tests { read: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; @@ -627,7 +627,7 @@ mod tests { my_vote: None, subscribed: None, saved: None, - ap_id: "http://fake.com".to_string(), + ap_id: inserted_comment.ap_id.to_owned(), local: true, community_actor_id: inserted_community.actor_id.to_owned(), community_local: true, @@ -665,7 +665,7 @@ mod tests { my_vote: Some(1), subscribed: Some(false), saved: Some(false), - ap_id: "http://fake.com".to_string(), + ap_id: inserted_comment.ap_id.to_owned(), local: true, community_actor_id: inserted_community.actor_id.to_owned(), community_local: true, diff --git a/server/lemmy_db/src/community.rs b/server/lemmy_db/src/community.rs index df5f129412..d033412c78 100644 --- a/server/lemmy_db/src/community.rs +++ b/server/lemmy_db/src/community.rs @@ -45,7 +45,7 @@ pub struct CommunityForm { pub updated: Option, pub deleted: Option, pub nsfw: bool, - pub actor_id: String, + pub actor_id: Option, pub local: bool, pub private_key: Option, pub public_key: Option, @@ -160,6 +160,16 @@ impl Community { .unwrap_or_default() .contains(&user_id) } + + pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result { + use crate::schema::community::dsl::*; + insert_into(community) + .values(community_form) + .on_conflict(actor_id) + .do_update() + .set(community_form) + .get_result::(conn) + } } #[derive(Identifiable, Queryable, Associations, PartialEq, Debug)] @@ -320,7 +330,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_8266238".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -340,7 +350,7 @@ mod tests { removed: None, deleted: None, updated: None, - actor_id: "changeme_7625376".into(), + actor_id: None, local: true, private_key: None, public_key: None, diff --git a/server/lemmy_db/src/moderator.rs b/server/lemmy_db/src/moderator.rs index 1a02d977ca..7d453d353f 100644 --- a/server/lemmy_db/src/moderator.rs +++ b/server/lemmy_db/src/moderator.rs @@ -426,7 +426,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_829398".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -454,7 +454,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_82982738".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -474,7 +474,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_283687".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -502,7 +502,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -519,7 +519,7 @@ mod tests { parent_id: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; diff --git a/server/lemmy_db/src/password_reset_request.rs b/server/lemmy_db/src/password_reset_request.rs index 06615187ea..f248f0b49f 100644 --- a/server/lemmy_db/src/password_reset_request.rs +++ b/server/lemmy_db/src/password_reset_request.rs @@ -103,7 +103,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_8292378".into(), + actor_id: None, bio: None, local: true, private_key: None, diff --git a/server/lemmy_db/src/post.rs b/server/lemmy_db/src/post.rs index a6df50bff6..177659eb99 100644 --- a/server/lemmy_db/src/post.rs +++ b/server/lemmy_db/src/post.rs @@ -53,13 +53,13 @@ pub struct PostForm { pub embed_description: Option, pub embed_html: Option, pub thumbnail_url: Option, - pub ap_id: String, + pub ap_id: Option, pub local: bool, } impl PostForm { pub fn get_ap_id(&self) -> Result { - Url::parse(&self.ap_id) + Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string())) } } @@ -180,12 +180,13 @@ impl Post { } pub fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result { - let existing = Self::read_from_apub_id(conn, &post_form.ap_id); - match existing { - Err(NotFound {}) => Ok(Self::create(conn, &post_form)?), - Ok(p) => Ok(Self::update(conn, p.id, &post_form)?), - Err(e) => Err(e), - } + use crate::schema::post::dsl::*; + insert_into(post) + .values(post_form) + .on_conflict(ap_id) + .do_update() + .set(post_form) + .get_result::(conn) } } @@ -358,7 +359,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_8292683678".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -378,7 +379,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_8223262378".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -406,7 +407,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -431,7 +432,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: inserted_post.ap_id.to_owned(), local: true, }; diff --git a/server/lemmy_db/src/post_view.rs b/server/lemmy_db/src/post_view.rs index 35bfc7ab39..d792538360 100644 --- a/server/lemmy_db/src/post_view.rs +++ b/server/lemmy_db/src/post_view.rs @@ -423,7 +423,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_8282738268".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -443,7 +443,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_2763".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -471,7 +471,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -555,7 +555,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".to_string(), + ap_id: inserted_post.ap_id.to_owned(), local: true, creator_actor_id: inserted_user.actor_id.to_owned(), creator_local: true, @@ -604,7 +604,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".to_string(), + ap_id: inserted_post.ap_id.to_owned(), local: true, creator_actor_id: inserted_user.actor_id.to_owned(), creator_local: true, diff --git a/server/lemmy_db/src/private_message.rs b/server/lemmy_db/src/private_message.rs index 007a962084..4361fa900d 100644 --- a/server/lemmy_db/src/private_message.rs +++ b/server/lemmy_db/src/private_message.rs @@ -27,7 +27,7 @@ pub struct PrivateMessageForm { pub read: Option, pub published: Option, pub updated: Option, - pub ap_id: String, + pub ap_id: Option, pub local: bool, } @@ -119,6 +119,17 @@ impl PrivateMessage { .set(read.eq(true)) .get_results::(conn) } + + // TODO use this + pub fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result { + use crate::schema::private_message::dsl::*; + insert_into(private_message) + .values(private_message_form) + .on_conflict(ap_id) + .do_update() + .set(private_message_form) + .get_result::(conn) + } } #[cfg(test)] @@ -153,7 +164,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_6723878".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -181,7 +192,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_287263876".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -199,7 +210,7 @@ mod tests { read: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; @@ -214,7 +225,7 @@ mod tests { read: false, updated: None, published: inserted_private_message.published, - ap_id: "http://fake.com".into(), + ap_id: inserted_private_message.ap_id.to_owned(), local: true, }; diff --git a/server/lemmy_db/src/schema.rs b/server/lemmy_db/src/schema.rs index c446edd9f2..a189dbced4 100644 --- a/server/lemmy_db/src/schema.rs +++ b/server/lemmy_db/src/schema.rs @@ -523,36 +523,36 @@ joinable!(user_mention -> comment (comment_id)); joinable!(user_mention -> user_ (recipient_id)); allow_tables_to_appear_in_same_query!( - activity, - category, - comment, - comment_aggregates_fast, - comment_like, - comment_saved, - community, - community_aggregates_fast, - community_follower, - community_moderator, - community_user_ban, - mod_add, - mod_add_community, - mod_ban, - mod_ban_from_community, - mod_lock_post, - mod_remove_comment, - mod_remove_community, - mod_remove_post, - mod_sticky_post, - password_reset_request, - post, - post_aggregates_fast, - post_like, - post_read, - post_saved, - private_message, - site, - user_, - user_ban, - user_fast, - user_mention, + activity, + category, + comment, + comment_aggregates_fast, + comment_like, + comment_saved, + community, + community_aggregates_fast, + community_follower, + community_moderator, + community_user_ban, + mod_add, + mod_add_community, + mod_ban, + mod_ban_from_community, + mod_lock_post, + mod_remove_comment, + mod_remove_community, + mod_remove_post, + mod_sticky_post, + password_reset_request, + post, + post_aggregates_fast, + post_like, + post_read, + post_saved, + private_message, + site, + user_, + user_ban, + user_fast, + user_mention, ); diff --git a/server/lemmy_db/src/user.rs b/server/lemmy_db/src/user.rs index 8416d38a1e..0a39bdb275 100644 --- a/server/lemmy_db/src/user.rs +++ b/server/lemmy_db/src/user.rs @@ -57,7 +57,7 @@ pub struct UserForm { pub show_avatars: bool, pub send_notifications_to_email: bool, pub matrix_user_id: Option, - pub actor_id: String, + pub actor_id: Option, pub bio: Option, pub local: bool, pub private_key: Option, @@ -152,6 +152,15 @@ impl User_ { pub fn get_profile_url(&self, hostname: &str) -> String { format!("https://{}/u/{}", hostname, self.name) } + + pub fn upsert(conn: &PgConnection, user_form: &UserForm) -> Result { + insert_into(user_) + .values(user_form) + .on_conflict(actor_id) + .do_update() + .set(user_form) + .get_result::(conn) + } } #[cfg(test)] @@ -180,7 +189,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_9826382637".into(), + actor_id: None, bio: None, local: true, private_key: None, diff --git a/server/lemmy_db/src/user_mention.rs b/server/lemmy_db/src/user_mention.rs index 4c0fdc5ff8..a5985223e3 100644 --- a/server/lemmy_db/src/user_mention.rs +++ b/server/lemmy_db/src/user_mention.rs @@ -106,7 +106,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_628763".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -134,7 +134,7 @@ mod tests { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: "changeme_927389278".into(), + actor_id: None, bio: None, local: true, private_key: None, @@ -154,7 +154,7 @@ mod tests { deleted: None, updated: None, nsfw: false, - actor_id: "changeme_876238".into(), + actor_id: None, local: true, private_key: None, public_key: None, @@ -182,7 +182,7 @@ mod tests { embed_description: None, embed_html: None, thumbnail_url: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -199,7 +199,7 @@ mod tests { parent_id: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; diff --git a/server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql b/server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql new file mode 100644 index 0000000000..2b43b59c35 --- /dev/null +++ b/server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql @@ -0,0 +1,27 @@ +-- Drop the uniques +alter table private_message drop constraint idx_private_message_ap_id; +alter table post drop constraint idx_post_ap_id; +alter table comment drop constraint idx_comment_ap_id; +alter table user_ drop constraint idx_user_actor_id; +alter table community drop constraint idx_community_actor_id; + +alter table private_message alter column ap_id set not null; +alter table private_message alter column ap_id set default 'http://fake.com'; + +alter table post alter column ap_id set not null; +alter table post alter column ap_id set default 'http://fake.com'; + +alter table comment alter column ap_id set not null; +alter table comment alter column ap_id set default 'http://fake.com'; + +update private_message +set ap_id = 'http://fake.com' +where ap_id like 'changeme_%'; + +update post +set ap_id = 'http://fake.com' +where ap_id like 'changeme_%'; + +update comment +set ap_id = 'http://fake.com' +where ap_id like 'changeme_%'; diff --git a/server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql b/server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql new file mode 100644 index 0000000000..75e81eef22 --- /dev/null +++ b/server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql @@ -0,0 +1,56 @@ +-- Add unique ap_id for private_message, comment, and post + +-- Need to delete the possible dupes for ones that don't start with the fake one +delete from private_message a using ( + select min(id) as id, ap_id + from private_message + group by ap_id having count(*) > 1 +) b +where a.ap_id = b.ap_id +and a.id <> b.id; + +delete from post a using ( + select min(id) as id, ap_id + from post + group by ap_id having count(*) > 1 +) b +where a.ap_id = b.ap_id +and a.id <> b.id; + +delete from comment a using ( + select min(id) as id, ap_id + from comment + group by ap_id having count(*) > 1 +) b +where a.ap_id = b.ap_id +and a.id <> b.id; + +-- Replacing the current default on the columns, to the unique one +update private_message +set ap_id = generate_unique_changeme() +where ap_id = 'http://fake.com'; + +update post +set ap_id = generate_unique_changeme() +where ap_id = 'http://fake.com'; + +update comment +set ap_id = generate_unique_changeme() +where ap_id = 'http://fake.com'; + +-- Add the unique indexes +alter table private_message alter column ap_id set not null; +alter table private_message alter column ap_id set default generate_unique_changeme(); + +alter table post alter column ap_id set not null; +alter table post alter column ap_id set default generate_unique_changeme(); + +alter table comment alter column ap_id set not null; +alter table comment alter column ap_id set default generate_unique_changeme(); + +-- Add the uniques, for user_ and community too +alter table private_message add constraint idx_private_message_ap_id unique (ap_id); +alter table post add constraint idx_post_ap_id unique (ap_id); +alter table comment add constraint idx_comment_ap_id unique (ap_id); +alter table user_ add constraint idx_user_actor_id unique (actor_id); +alter table community add constraint idx_community_actor_id unique (actor_id); diff --git a/server/src/api/comment.rs b/server/src/api/comment.rs index 3384993f88..f2effab58b 100644 --- a/server/src/api/comment.rs +++ b/server/src/api/comment.rs @@ -146,7 +146,7 @@ impl Perform for CreateComment { read: None, published: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, }; diff --git a/server/src/api/community.rs b/server/src/api/community.rs index 7b63c67269..c94ca59b7b 100644 --- a/server/src/api/community.rs +++ b/server/src/api/community.rs @@ -274,7 +274,7 @@ impl Perform for CreateCommunity { deleted: None, nsfw: data.nsfw, updated: None, - actor_id, + actor_id: Some(actor_id), local: true, private_key: Some(keypair.private_key), public_key: Some(keypair.public_key), @@ -368,7 +368,7 @@ impl Perform for EditCommunity { deleted: Some(read_community.deleted), nsfw: data.nsfw, updated: Some(naive_now()), - actor_id: read_community.actor_id, + actor_id: Some(read_community.actor_id), local: read_community.local, private_key: read_community.private_key, public_key: read_community.public_key, diff --git a/server/src/api/post.rs b/server/src/api/post.rs index 5cb7e32221..9f0fb3be05 100644 --- a/server/src/api/post.rs +++ b/server/src/api/post.rs @@ -187,7 +187,7 @@ impl Perform for CreatePost { embed_description: iframely_description, embed_html: iframely_html, thumbnail_url: pictrs_thumbnail, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; @@ -518,7 +518,7 @@ impl Perform for EditPost { embed_description: iframely_description, embed_html: iframely_html, thumbnail_url: pictrs_thumbnail, - ap_id: orig_post.ap_id, + ap_id: Some(orig_post.ap_id), local: orig_post.local, published: None, }; diff --git a/server/src/api/user.rs b/server/src/api/user.rs index e97a6d33be..e6cf2a820a 100644 --- a/server/src/api/user.rs +++ b/server/src/api/user.rs @@ -410,7 +410,7 @@ impl Perform for Register { lang: "browser".into(), show_avatars: true, send_notifications_to_email: false, - actor_id: make_apub_endpoint(EndpointType::User, &data.username).to_string(), + actor_id: Some(make_apub_endpoint(EndpointType::User, &data.username).to_string()), bio: None, local: true, private_key: Some(user_keypair.private_key), @@ -441,37 +441,38 @@ impl Perform for Register { let main_community_keypair = generate_actor_keypair()?; // Create the main community if it doesn't exist - let main_community = match blocking(context.pool(), move |conn| Community::read(conn, 2)) - .await? - { - Ok(c) => c, - Err(_e) => { - let default_community_name = "main"; - let community_form = CommunityForm { - name: default_community_name.to_string(), - title: "The Default Community".to_string(), - description: Some("The Default Community".to_string()), - category_id: 1, - nsfw: false, - creator_id: inserted_user.id, - removed: None, - deleted: None, - updated: None, - actor_id: make_apub_endpoint(EndpointType::Community, default_community_name).to_string(), - local: true, - private_key: Some(main_community_keypair.private_key), - public_key: Some(main_community_keypair.public_key), - last_refreshed_at: None, - published: None, - icon: None, - banner: None, - }; - blocking(context.pool(), move |conn| { - Community::create(conn, &community_form) - }) - .await?? - } - }; + let main_community = + match blocking(context.pool(), move |conn| Community::read(conn, 2)).await? { + Ok(c) => c, + Err(_e) => { + let default_community_name = "main"; + let community_form = CommunityForm { + name: default_community_name.to_string(), + title: "The Default Community".to_string(), + description: Some("The Default Community".to_string()), + category_id: 1, + nsfw: false, + creator_id: inserted_user.id, + removed: None, + deleted: None, + updated: None, + actor_id: Some( + make_apub_endpoint(EndpointType::Community, default_community_name).to_string(), + ), + local: true, + private_key: Some(main_community_keypair.private_key), + public_key: Some(main_community_keypair.public_key), + last_refreshed_at: None, + published: None, + icon: None, + banner: None, + }; + blocking(context.pool(), move |conn| { + Community::create(conn, &community_form) + }) + .await?? + } + }; // Sign them up for main community no matter what let community_follower_form = CommunityFollowerForm { @@ -643,7 +644,7 @@ impl Perform for SaveUserSettings { lang: data.lang.to_owned(), show_avatars: data.show_avatars, send_notifications_to_email: data.send_notifications_to_email, - actor_id: read_user.actor_id, + actor_id: Some(read_user.actor_id), bio, local: read_user.local, private_key: read_user.private_key, @@ -1218,7 +1219,7 @@ impl Perform for CreatePrivateMessage { deleted: None, read: None, updated: None, - ap_id: "http://fake.com".into(), + ap_id: None, local: true, published: None, }; diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs index 4700bb0892..b4d6c4d2ba 100644 --- a/server/src/apub/activities.rs +++ b/server/src/apub/activities.rs @@ -1,72 +1,38 @@ use crate::{ - apub::{ - check_is_apub_id_valid, - community::do_announce, - extensions::signatures::sign, - insert_activity, - ActorType, - }, - request::retry_custom, + apub::{activity_queue::send_activity, community::do_announce, insert_activity}, LemmyContext, LemmyError, }; -use activitystreams::base::AnyBase; -use actix_web::client::Client; +use activitystreams::{ + base::{Extends, ExtendsExt}, + object::AsObject, +}; use lemmy_db::{community::Community, user::User_}; use lemmy_utils::{get_apub_protocol_string, settings::Settings}; -use log::debug; +use serde::{export::fmt::Debug, Serialize}; use url::{ParseError, Url}; use uuid::Uuid; -pub async fn send_activity_to_community( +pub async fn send_activity_to_community( creator: &User_, community: &Community, to: Vec, - activity: AnyBase, + activity: T, context: &LemmyContext, -) -> Result<(), LemmyError> { +) -> Result<(), LemmyError> +where + T: AsObject + Extends + Serialize + Debug + Send + Clone + 'static, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + // TODO: looks like call this sometimes with activity, and sometimes with any_base insert_activity(creator.id, activity.clone(), true, context.pool()).await?; // if this is a local community, we need to do an announce from the community instead if community.local { - do_announce(activity, &community, creator, context).await?; + do_announce(activity.into_any_base()?, &community, creator, context).await?; } else { - send_activity(context.client(), &activity, creator, to).await?; - } - - Ok(()) -} - -/// Send an activity to a list of recipients, using the correct headers etc. -pub async fn send_activity( - client: &Client, - activity: &AnyBase, - actor: &dyn ActorType, - to: Vec, -) -> Result<(), LemmyError> { - if !Settings::get().federation.enabled { - return Ok(()); - } - - let activity = serde_json::to_string(&activity)?; - debug!("Sending activitypub activity {} to {:?}", activity, to); - - for to_url in to { - check_is_apub_id_valid(&to_url)?; - - let res = retry_custom(|| async { - let request = client - .post(to_url.as_str()) - .header("Content-Type", "application/json"); - - match sign(request, actor, activity.clone()).await { - Ok(signed) => Ok(signed.send().await), - Err(e) => Err(e), - } - }) - .await?; - - debug!("Result for activity send: {:?}", res); + send_activity(context.activity_queue(), activity, creator, to)?; } Ok(()) diff --git a/server/src/apub/activity_queue.rs b/server/src/apub/activity_queue.rs new file mode 100644 index 0000000000..bc5faaa393 --- /dev/null +++ b/server/src/apub/activity_queue.rs @@ -0,0 +1,133 @@ +use crate::{ + apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType}, + LemmyError, +}; +use activitystreams::{ + base::{Extends, ExtendsExt}, + object::AsObject, +}; +use anyhow::{anyhow, Context, Error}; +use awc::Client; +use background_jobs::{ + create_server, + memory_storage::Storage, + ActixJob, + Backoff, + MaxRetries, + QueueHandle, + WorkerConfig, +}; +use lemmy_utils::{location_info, settings::Settings}; +use log::warn; +use serde::{Deserialize, Serialize}; +use std::{future::Future, pin::Pin}; +use url::Url; + +pub fn send_activity( + activity_sender: &QueueHandle, + activity: T, + actor: &dyn ActorType, + to: Vec, +) -> Result<(), LemmyError> +where + T: AsObject, + T: Extends, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + if !Settings::get().federation.enabled { + return Ok(()); + } + + let activity = activity.into_any_base()?; + let serialised_activity = serde_json::to_string(&activity)?; + + for to_url in &to { + check_is_apub_id_valid(&to_url)?; + } + + // TODO: it would make sense to create a separate task for each destination server + let message = SendActivityTask { + activity: serialised_activity, + to, + actor_id: actor.actor_id()?, + private_key: actor.private_key().context(location_info!())?, + }; + activity_sender.queue::(message)?; + + Ok(()) +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct SendActivityTask { + activity: String, + to: Vec, + actor_id: Url, + private_key: String, +} + +impl ActixJob for SendActivityTask { + type State = MyState; + type Future = Pin>>>; + const NAME: &'static str = "SendActivityTask"; + + const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); + const BACKOFF: Backoff = Backoff::Exponential(2); + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(async move { + for to_url in &self.to { + let request = state + .client + .post(to_url.as_str()) + .header("Content-Type", "application/json"); + + // TODO: i believe we have to do the signing in here because it is only valid for a few seconds + let signed = sign( + request, + self.activity.clone(), + &self.actor_id, + self.private_key.to_owned(), + ) + .await; + let signed = match signed { + Ok(s) => s, + Err(e) => { + warn!("{}", e); + // dont return an error because retrying would probably not fix the signing + return Ok(()); + } + }; + if let Err(e) = signed.send().await { + warn!("{}", e); + return Err(anyhow!( + "Failed to send activity {} to {}", + &self.activity, + to_url + )); + } + } + + Ok(()) + }) + } +} + +pub fn create_activity_queue() -> QueueHandle { + // Start the application server. This guards access to to the jobs store + let queue_handle = create_server(Storage::new()); + + // Configure and start our workers + WorkerConfig::new(|| MyState { + client: Client::default(), + }) + .register::() + .start(queue_handle.clone()); + + queue_handle +} + +#[derive(Clone)] +struct MyState { + pub client: Client, +} diff --git a/server/src/apub/comment.rs b/server/src/apub/comment.rs index a9a97c0833..988904e973 100644 --- a/server/src/apub/comment.rs +++ b/server/src/apub/comment.rs @@ -192,7 +192,7 @@ impl FromApub for CommentForm { published: note.published().map(|u| u.to_owned().naive_local()), updated: note.updated().map(|u| u.to_owned().naive_local()), deleted: None, - ap_id: check_actor_domain(note, expected_domain)?, + ap_id: Some(check_actor_domain(note, expected_domain)?), local: false, }) } @@ -224,14 +224,7 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community( - &creator, - &community, - maa.inboxes, - create.into_any_base()?, - context, - ) - .await?; + send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?; Ok(()) } @@ -259,14 +252,7 @@ impl ApubObjectType for Comment { // Set the mention tags .set_many_tags(maa.get_tags()?); - send_activity_to_community( - &creator, - &community, - maa.inboxes, - update.into_any_base()?, - context, - ) - .await?; + send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?; Ok(()) } @@ -293,7 +279,7 @@ impl ApubObjectType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - delete.into_any_base()?, + delete, context, ) .await?; @@ -336,7 +322,7 @@ impl ApubObjectType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -366,7 +352,7 @@ impl ApubObjectType for Comment { &mod_, &community, vec![community.get_shared_inbox_url()?], - remove.into_any_base()?, + remove, context, ) .await?; @@ -405,7 +391,7 @@ impl ApubObjectType for Comment { &mod_, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -438,7 +424,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - like.into_any_base()?, + like, context, ) .await?; @@ -468,7 +454,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - dislike.into_any_base()?, + dislike, context, ) .await?; @@ -510,7 +496,7 @@ impl ApubLikeableType for Comment { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index 016f342dc6..67baa7860c 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -1,7 +1,8 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_queue::send_activity, check_actor_domain, create_apub_response, create_apub_tombstone_response, @@ -155,7 +156,7 @@ impl ActorType for Community { insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?; - send_activity(context.client(), &accept.into_any_base()?, self, vec![to]).await?; + send_activity(context.activity_queue(), accept, self, vec![to])?; Ok(()) } @@ -176,7 +177,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &delete.into_any_base()?, creator, inboxes).await?; + send_activity(context.activity_queue(), delete, creator, inboxes)?; Ok(()) } @@ -208,7 +209,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &undo.into_any_base()?, creator, inboxes).await?; + send_activity(context.activity_queue(), undo, creator, inboxes)?; Ok(()) } @@ -229,7 +230,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for delete, the creator is the actor, and does the signing - send_activity(context.client(), &remove.into_any_base()?, mod_, inboxes).await?; + send_activity(context.activity_queue(), remove, mod_, inboxes)?; Ok(()) } @@ -258,7 +259,7 @@ impl ActorType for Community { // Note: For an accept, since it was automatic, no one pushed a button, // the community was the actor. // But for remove , the creator is the actor, and does the signing - send_activity(context.client(), &undo.into_any_base()?, mod_, inboxes).await?; + send_activity(context.activity_queue(), undo, mod_, inboxes)?; Ok(()) } @@ -402,7 +403,7 @@ impl FromApub for CommunityForm { updated: group.inner.updated().map(|u| u.to_owned().naive_local()), deleted: None, nsfw: group.ext_one.sensitive, - actor_id: check_actor_domain(group, expected_domain)?, + actor_id: Some(check_actor_domain(group, expected_domain)?), local: false, private_key: None, public_key: Some(group.ext_two.to_owned().public_key.public_key_pem), @@ -511,7 +512,7 @@ pub async fn do_announce( let community_shared_inbox = community.get_shared_inbox_url()?; to.retain(|x| x != &community_shared_inbox); - send_activity(context.client(), &announce.into_any_base()?, community, to).await?; + send_activity(context.activity_queue(), announce, community, to)?; Ok(()) } diff --git a/server/src/apub/extensions/signatures.rs b/server/src/apub/extensions/signatures.rs index 96063d5e0b..cb96977753 100644 --- a/server/src/apub/extensions/signatures.rs +++ b/server/src/apub/extensions/signatures.rs @@ -16,6 +16,7 @@ use openssl::{ }; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use url::Url; lazy_static! { static ref HTTP_SIG_CONFIG: Config = Config::new(); @@ -24,11 +25,11 @@ lazy_static! { /// Signs request headers with the given keypair. pub async fn sign( request: ClientRequest, - actor: &dyn ActorType, activity: String, + actor_id: &Url, + private_key: String, ) -> Result, LemmyError> { - let signing_key_id = format!("{}#main-key", actor.actor_id()?); - let private_key = actor.private_key().context(location_info!())?; + let signing_key_id = format!("{}#main-key", actor_id); let digest_client = request .signature_with_digest( diff --git a/server/src/apub/fetcher.rs b/server/src/apub/fetcher.rs index 0c42aa14e4..0fcb115037 100644 --- a/server/src/apub/fetcher.rs +++ b/server/src/apub/fetcher.rs @@ -15,7 +15,6 @@ use crate::{ LemmyError, }; use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*}; -use actix_web::client::Client; use anyhow::{anyhow, Context}; use chrono::NaiveDateTime; use diesel::result::Error::NotFound; @@ -35,6 +34,7 @@ use lemmy_db::{ }; use lemmy_utils::{get_apub_protocol_string, location_info}; use log::debug; +use reqwest::Client; use serde::Deserialize; use std::{fmt::Debug, time::Duration}; use url::Url; @@ -55,6 +55,9 @@ where let timeout = Duration::from_secs(60); + // speed up tests + // before: 305s + // after: 240s let json = retry(|| { client .get(url.as_str()) @@ -230,7 +233,7 @@ pub async fn get_or_fetch_and_upsert_user( let person = fetch_remote_object::(context.client(), apub_id).await?; let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?; - let user = blocking(context.pool(), move |conn| User_::create(conn, &uf)).await??; + let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??; Ok(user) } @@ -286,14 +289,7 @@ async fn fetch_remote_community( let group = fetch_remote_object::(context.client(), apub_id).await?; let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?; - let community = blocking(context.pool(), move |conn| { - if let Some(cid) = community_id { - Community::update(conn, cid, &cf) - } else { - Community::create(conn, &cf) - } - }) - .await??; + let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??; // Also add the community moderators too let attributed_to = group.inner.attributed_to().context(location_info!())?; @@ -341,7 +337,7 @@ async fn fetch_remote_community( for o in outbox_items { let page = PageExt::from_any_base(o)?.context(location_info!())?; let post = PostForm::from_apub(&page, context, None).await?; - let post_ap_id = post.ap_id.clone(); + let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone(); // Check whether the post already exists in the local db let existing = blocking(context.pool(), move |conn| { Post::read_from_apub_id(conn, &post_ap_id) @@ -349,7 +345,7 @@ async fn fetch_remote_community( .await?; match existing { Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??, - Err(_) => blocking(context.pool(), move |conn| Post::create(conn, &post)).await??, + Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??, }; // TODO: we need to send a websocket update here } @@ -374,7 +370,7 @@ pub async fn get_or_fetch_and_insert_post( let post = fetch_remote_object::(context.client(), post_ap_id).await?; let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?; - let post = blocking(context.pool(), move |conn| Post::create(conn, &post_form)).await??; + let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??; Ok(post) } @@ -404,7 +400,7 @@ pub async fn get_or_fetch_and_insert_comment( CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?; let comment = blocking(context.pool(), move |conn| { - Comment::create(conn, &comment_form) + Comment::upsert(conn, &comment_form) }) .await??; diff --git a/server/src/apub/inbox/activities/delete.rs b/server/src/apub/inbox/activities/delete.rs index 2a6689dbf7..9c4f0beeff 100644 --- a/server/src/apub/inbox/activities/delete.rs +++ b/server/src/apub/inbox/activities/delete.rs @@ -78,7 +78,7 @@ async fn receive_delete_post( embed_description: post.embed_description, embed_html: post.embed_html, thumbnail_url: post.thumbnail_url, - ap_id: post.ap_id, + ap_id: Some(post.ap_id), local: post.local, published: None, }; @@ -131,7 +131,7 @@ async fn receive_delete_comment( read: None, published: None, updated: Some(naive_now()), - ap_id: comment.ap_id, + ap_id: Some(comment.ap_id), local: comment.local, }; let comment_id = comment.id; @@ -175,7 +175,8 @@ async fn receive_delete_community( let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?)) .await? - .actor_id; + .actor_id + .context(location_info!())?; let community = blocking(context.pool(), move |conn| { Community::read_from_actor_id(conn, &community_actor_id) @@ -193,7 +194,7 @@ async fn receive_delete_community( updated: Some(naive_now()), deleted: Some(true), nsfw: community.nsfw, - actor_id: community.actor_id, + actor_id: Some(community.actor_id), local: community.local, private_key: community.private_key, public_key: community.public_key, diff --git a/server/src/apub/inbox/activities/remove.rs b/server/src/apub/inbox/activities/remove.rs index 83a748436a..83eb6f3394 100644 --- a/server/src/apub/inbox/activities/remove.rs +++ b/server/src/apub/inbox/activities/remove.rs @@ -85,7 +85,7 @@ async fn receive_remove_post( embed_description: post.embed_description, embed_html: post.embed_html, thumbnail_url: post.thumbnail_url, - ap_id: post.ap_id, + ap_id: Some(post.ap_id), local: post.local, published: None, }; @@ -138,7 +138,7 @@ async fn receive_remove_comment( read: None, published: None, updated: Some(naive_now()), - ap_id: comment.ap_id, + ap_id: Some(comment.ap_id), local: comment.local, }; let comment_id = comment.id; @@ -182,7 +182,8 @@ async fn receive_remove_community( let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?)) .await? - .actor_id; + .actor_id + .context(location_info!())?; let community = blocking(context.pool(), move |conn| { Community::read_from_actor_id(conn, &community_actor_id) @@ -200,7 +201,7 @@ async fn receive_remove_community( updated: Some(naive_now()), deleted: None, nsfw: community.nsfw, - actor_id: community.actor_id, + actor_id: Some(community.actor_id), local: community.local, private_key: community.private_key, public_key: community.public_key, diff --git a/server/src/apub/inbox/activities/undo.rs b/server/src/apub/inbox/activities/undo.rs index f356c91be8..9a589554d8 100644 --- a/server/src/apub/inbox/activities/undo.rs +++ b/server/src/apub/inbox/activities/undo.rs @@ -175,7 +175,7 @@ async fn receive_undo_delete_comment( read: None, published: None, updated: Some(naive_now()), - ap_id: comment.ap_id, + ap_id: Some(comment.ap_id), local: comment.local, }; let comment_id = comment.id; @@ -234,7 +234,7 @@ async fn receive_undo_remove_comment( read: None, published: None, updated: Some(naive_now()), - ap_id: comment.ap_id, + ap_id: Some(comment.ap_id), local: comment.local, }; let comment_id = comment.id; @@ -299,7 +299,7 @@ async fn receive_undo_delete_post( embed_description: post.embed_description, embed_html: post.embed_html, thumbnail_url: post.thumbnail_url, - ap_id: post.ap_id, + ap_id: Some(post.ap_id), local: post.local, published: None, }; @@ -359,7 +359,7 @@ async fn receive_undo_remove_post( embed_description: post.embed_description, embed_html: post.embed_html, thumbnail_url: post.thumbnail_url, - ap_id: post.ap_id, + ap_id: Some(post.ap_id), local: post.local, published: None, }; @@ -399,7 +399,8 @@ async fn receive_undo_delete_community( let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?)) .await? - .actor_id; + .actor_id + .context(location_info!())?; let community = blocking(context.pool(), move |conn| { Community::read_from_actor_id(conn, &community_actor_id) @@ -417,7 +418,7 @@ async fn receive_undo_delete_community( updated: Some(naive_now()), deleted: Some(false), nsfw: community.nsfw, - actor_id: community.actor_id, + actor_id: Some(community.actor_id), local: community.local, private_key: community.private_key, public_key: community.public_key, @@ -464,7 +465,8 @@ async fn receive_undo_remove_community( let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?)) .await? - .actor_id; + .actor_id + .context(location_info!())?; let community = blocking(context.pool(), move |conn| { Community::read_from_actor_id(conn, &community_actor_id) @@ -482,7 +484,7 @@ async fn receive_undo_remove_community( updated: Some(naive_now()), deleted: None, nsfw: community.nsfw, - actor_id: community.actor_id, + actor_id: Some(community.actor_id), local: community.local, private_key: community.private_key, public_key: community.public_key, diff --git a/server/src/apub/inbox/shared_inbox.rs b/server/src/apub/inbox/shared_inbox.rs index c9f9324dc9..0f8cc8ed88 100644 --- a/server/src/apub/inbox/shared_inbox.rs +++ b/server/src/apub/inbox/shared_inbox.rs @@ -66,6 +66,8 @@ pub async fn shared_inbox( let json = serde_json::to_string(&activity)?; debug!("Shared inbox received activity: {}", json); + // TODO: if we already received an activity with identical ID, then ignore this (same in other inboxes) + let sender = &activity .actor()? .to_owned() diff --git a/server/src/apub/inbox/user_inbox.rs b/server/src/apub/inbox/user_inbox.rs index 103fd92ab1..27d58ebcd3 100644 --- a/server/src/apub/inbox/user_inbox.rs +++ b/server/src/apub/inbox/user_inbox.rs @@ -175,7 +175,11 @@ async fn receive_update_private_message( let domain = Some(update.id_unchecked().context(location_info!())?.to_owned()); let private_message_form = PrivateMessageForm::from_apub(¬e, context, domain).await?; - let private_message_ap_id = private_message_form.ap_id.clone(); + let private_message_ap_id = private_message_form + .ap_id + .as_ref() + .context(location_info!())? + .clone(); let private_message = blocking(&context.pool(), move |conn| { PrivateMessage::read_from_apub_id(conn, &private_message_ap_id) }) @@ -224,7 +228,7 @@ async fn receive_delete_private_message( let domain = Some(delete.id_unchecked().context(location_info!())?.to_owned()); let private_message_form = PrivateMessageForm::from_apub(¬e, context, domain).await?; - let private_message_ap_id = private_message_form.ap_id; + let private_message_ap_id = private_message_form.ap_id.context(location_info!())?; let private_message = blocking(&context.pool(), move |conn| { PrivateMessage::read_from_apub_id(conn, &private_message_ap_id) }) @@ -236,7 +240,7 @@ async fn receive_delete_private_message( creator_id: private_message.creator_id, deleted: Some(true), read: None, - ap_id: private_message.ap_id, + ap_id: Some(private_message.ap_id), local: private_message.local, published: None, updated: Some(naive_now()), @@ -287,7 +291,11 @@ async fn receive_undo_delete_private_message( let domain = Some(undo.id_unchecked().context(location_info!())?.to_owned()); let private_message = PrivateMessageForm::from_apub(¬e, context, domain).await?; - let private_message_ap_id = private_message.ap_id.clone(); + let private_message_ap_id = private_message + .ap_id + .as_ref() + .context(location_info!())? + .clone(); let private_message_id = blocking(&context.pool(), move |conn| { PrivateMessage::read_from_apub_id(conn, &private_message_ap_id).map(|pm| pm.id) }) diff --git a/server/src/apub/mod.rs b/server/src/apub/mod.rs index dddbd7e04d..c545a5fd04 100644 --- a/server/src/apub/mod.rs +++ b/server/src/apub/mod.rs @@ -1,4 +1,5 @@ pub mod activities; +pub mod activity_queue; pub mod comment; pub mod community; pub mod extensions; @@ -30,7 +31,7 @@ use activitystreams::{ prelude::*, }; use activitystreams_ext::{Ext1, Ext2}; -use actix_web::{body::Body, client::Client, HttpResponse}; +use actix_web::{body::Body, HttpResponse}; use anyhow::{anyhow, Context}; use chrono::NaiveDateTime; use lemmy_db::{activity::do_insert_activity, user::User_}; @@ -42,6 +43,7 @@ use lemmy_utils::{ MentionData, }; use log::debug; +use reqwest::Client; use serde::Serialize; use url::{ParseError, Url}; @@ -326,7 +328,7 @@ pub async fn fetch_webfinger_url( ); debug!("Fetching webfinger url: {}", &fetch_url); - let mut response = retry(|| client.get(&fetch_url).send()).await?; + let response = retry(|| client.get(&fetch_url).send()).await?; let res: WebFingerResponse = response .json() diff --git a/server/src/apub/post.rs b/server/src/apub/post.rs index 4f2a831552..606c475219 100644 --- a/server/src/apub/post.rs +++ b/server/src/apub/post.rs @@ -289,7 +289,7 @@ impl FromApub for PostForm { embed_description: embed.description, embed_html: embed.html, thumbnail_url, - ap_id: check_actor_domain(page, expected_domain)?, + ap_id: Some(check_actor_domain(page, expected_domain)?), local: false, }) } @@ -318,7 +318,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - create.into_any_base()?, + create, context, ) .await?; @@ -346,7 +346,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - update.into_any_base()?, + update, context, ) .await?; @@ -373,7 +373,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - delete.into_any_base()?, + delete, context, ) .await?; @@ -412,7 +412,7 @@ impl ApubObjectType for Post { creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -439,7 +439,7 @@ impl ApubObjectType for Post { mod_, &community, vec![community.get_shared_inbox_url()?], - remove.into_any_base()?, + remove, context, ) .await?; @@ -474,7 +474,7 @@ impl ApubObjectType for Post { mod_, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; @@ -504,7 +504,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - like.into_any_base()?, + like, context, ) .await?; @@ -531,7 +531,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - dislike.into_any_base()?, + dislike, context, ) .await?; @@ -570,7 +570,7 @@ impl ApubLikeableType for Post { &creator, &community, vec![community.get_shared_inbox_url()?], - undo.into_any_base()?, + undo, context, ) .await?; diff --git a/server/src/apub/private_message.rs b/server/src/apub/private_message.rs index 8e5836885c..1b3472e328 100644 --- a/server/src/apub/private_message.rs +++ b/server/src/apub/private_message.rs @@ -1,6 +1,7 @@ use crate::{ apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_queue::send_activity, check_actor_domain, check_is_apub_id_valid, create_tombstone, @@ -110,7 +111,7 @@ impl FromApub for PrivateMessageForm { updated: note.updated().map(|u| u.to_owned().naive_local()), deleted: None, read: None, - ap_id: check_actor_domain(note, expected_domain)?, + ap_id: Some(check_actor_domain(note, expected_domain)?), local: false, }) } @@ -134,13 +135,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, create.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &create.into_any_base()?, - creator, - vec![to], - ) - .await?; + send_activity(context.activity_queue(), create, creator, vec![to])?; Ok(()) } @@ -160,13 +155,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, update.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &update.into_any_base()?, - creator, - vec![to], - ) - .await?; + send_activity(context.activity_queue(), update, creator, vec![to])?; Ok(()) } @@ -185,13 +174,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, delete.clone(), true, context.pool()).await?; - send_activity( - context.client(), - &delete.into_any_base()?, - creator, - vec![to], - ) - .await?; + send_activity(context.activity_queue(), delete, creator, vec![to])?; Ok(()) } @@ -221,7 +204,7 @@ impl ApubObjectType for PrivateMessage { insert_activity(creator.id, undo.clone(), true, context.pool()).await?; - send_activity(context.client(), &undo.into_any_base()?, creator, vec![to]).await?; + send_activity(context.activity_queue(), undo, creator, vec![to])?; Ok(()) } diff --git a/server/src/apub/user.rs b/server/src/apub/user.rs index f6225dea2f..a61813c185 100644 --- a/server/src/apub/user.rs +++ b/server/src/apub/user.rs @@ -1,7 +1,8 @@ use crate::{ api::{check_slurs, check_slurs_opt}, apub::{ - activities::{generate_activity_id, send_activity}, + activities::generate_activity_id, + activity_queue::send_activity, check_actor_domain, create_apub_response, fetcher::get_or_fetch_and_upsert_actor, @@ -127,7 +128,7 @@ impl ActorType for User_ { insert_activity(self.id, follow.clone(), true, context.pool()).await?; - send_activity(context.client(), &follow.into_any_base()?, self, vec![to]).await?; + send_activity(context.activity_queue(), follow, self, vec![to])?; Ok(()) } @@ -152,7 +153,7 @@ impl ActorType for User_ { insert_activity(self.id, undo.clone(), true, context.pool()).await?; - send_activity(context.client(), &undo.into_any_base()?, self, vec![to]).await?; + send_activity(context.activity_queue(), undo, self, vec![to])?; Ok(()) } @@ -268,7 +269,7 @@ impl FromApub for UserForm { show_avatars: false, send_notifications_to_email: false, matrix_user_id: None, - actor_id: check_actor_domain(person, expected_domain)?, + actor_id: Some(check_actor_domain(person, expected_domain)?), bio, local: false, private_key: None, diff --git a/server/src/code_migrations.rs b/server/src/code_migrations.rs index 7f45237c4b..d5139441d9 100644 --- a/server/src/code_migrations.rs +++ b/server/src/code_migrations.rs @@ -67,7 +67,7 @@ fn user_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> { lang: cuser.lang.to_owned(), show_avatars: cuser.show_avatars, send_notifications_to_email: cuser.send_notifications_to_email, - actor_id: make_apub_endpoint(EndpointType::User, &cuser.name).to_string(), + actor_id: Some(make_apub_endpoint(EndpointType::User, &cuser.name).to_string()), bio: cuser.bio.to_owned(), local: cuser.local, private_key: Some(keypair.private_key), @@ -111,7 +111,7 @@ fn community_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> { deleted: None, nsfw: ccommunity.nsfw, updated: None, - actor_id: make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string(), + actor_id: Some(make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string()), local: ccommunity.local, private_key: Some(keypair.private_key), public_key: Some(keypair.public_key), @@ -138,7 +138,7 @@ fn post_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> { // Update the ap_id let incorrect_posts = post - .filter(ap_id.eq("http://fake.com")) + .filter(ap_id.eq("changeme_%")) .filter(local.eq(true)) .load::(conn)?; @@ -163,7 +163,7 @@ fn comment_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> { // Update the ap_id let incorrect_comments = comment - .filter(ap_id.eq("http://fake.com")) + .filter(ap_id.eq("changeme_%")) .filter(local.eq(true)) .load::(conn)?; @@ -188,7 +188,7 @@ fn private_message_updates_2020_05_05(conn: &PgConnection) -> Result<(), LemmyEr // Update the ap_id let incorrect_pms = private_message - .filter(ap_id.eq("http://fake.com")) + .filter(ap_id.eq("changeme_%")) .filter(local.eq(true)) .load::(conn)?; diff --git a/server/src/lib.rs b/server/src/lib.rs index 07ee15d406..32b43ef845 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -14,6 +14,7 @@ pub extern crate dotenv; pub extern crate jsonwebtoken; extern crate log; pub extern crate openssl; +pub extern crate reqwest; pub extern crate rss; pub extern crate serde; pub extern crate serde_json; @@ -33,12 +34,15 @@ use crate::{ request::{retry, RecvError}, websocket::server::ChatServer, }; + use actix::Addr; -use actix_web::{client::Client, dev::ConnectionInfo}; +use actix_web::dev::ConnectionInfo; use anyhow::anyhow; +use background_jobs::QueueHandle; use lemmy_utils::{get_apub_protocol_string, settings::Settings}; use log::error; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; +use reqwest::Client; use serde::Deserialize; use std::process::Command; @@ -75,14 +79,21 @@ pub struct LemmyContext { pub pool: DbPool, pub chat_server: Addr, pub client: Client, + pub activity_queue: QueueHandle, } impl LemmyContext { - pub fn create(pool: DbPool, chat_server: Addr, client: Client) -> LemmyContext { + pub fn create( + pool: DbPool, + chat_server: Addr, + client: Client, + activity_queue: QueueHandle, + ) -> LemmyContext { LemmyContext { pool, chat_server, client, + activity_queue, } } pub fn pool(&self) -> &DbPool { @@ -94,6 +105,9 @@ impl LemmyContext { pub fn client(&self) -> &Client { &self.client } + pub fn activity_queue(&self) -> &QueueHandle { + &self.activity_queue + } } impl Clone for LemmyContext { @@ -102,6 +116,7 @@ impl Clone for LemmyContext { pool: self.pool.clone(), chat_server: self.chat_server.clone(), client: self.client.clone(), + activity_queue: self.activity_queue.clone(), } } } @@ -117,7 +132,7 @@ pub struct IframelyResponse { pub async fn fetch_iframely(client: &Client, url: &str) -> Result { let fetch_url = format!("http://iframely/oembed?url={}", url); - let mut response = retry(|| client.get(&fetch_url).send()).await?; + let response = retry(|| client.get(&fetch_url).send()).await?; let res: IframelyResponse = response .json() @@ -146,7 +161,7 @@ pub async fn fetch_pictrs(client: &Client, image_url: &str) -> Result Result<(), LemmyError> { settings.bind, settings.port ); - let chat_server = - ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start(); + let activity_queue = create_activity_queue(); + let chat_server = ChatServer::startup( + pool.clone(), + rate_limiter.clone(), + Client::default(), + activity_queue.clone(), + ) + .start(); // Create Http server with websocket support HttpServer::new(move || { - let context = LemmyContext::create(pool.clone(), chat_server.to_owned(), Client::default()); + let context = LemmyContext::create( + pool.clone(), + chat_server.to_owned(), + Client::default(), + activity_queue.to_owned(), + ); let settings = Settings::get(); let rate_limiter = rate_limiter.clone(); App::new() diff --git a/server/src/request.rs b/server/src/request.rs index 70a2b6933e..490609e7dc 100644 --- a/server/src/request.rs +++ b/server/src/request.rs @@ -14,15 +14,15 @@ pub struct RecvError(pub String); pub async fn retry(f: F) -> Result where F: Fn() -> Fut, - Fut: Future>, + Fut: Future>, { retry_custom(|| async { Ok((f)().await) }).await } -pub async fn retry_custom(f: F) -> Result +async fn retry_custom(f: F) -> Result where F: Fn() -> Fut, - Fut: Future, LemmyError>>, + Fut: Future, LemmyError>>, { let mut response = Err(anyhow!("connect timeout").into()); @@ -30,7 +30,7 @@ where match (f)().await? { Ok(t) => return Ok(t), Err(e) => { - if is_connect_timeout(&e) { + if e.is_timeout() { response = Err(SendError(e.to_string()).into()); continue; } @@ -41,13 +41,3 @@ where response } - -fn is_connect_timeout(e: &actix_web::client::SendRequestError) -> bool { - if let actix_web::client::SendRequestError::Connect(e) = e { - if let actix_web::client::ConnectError::Timeout = e { - return true; - } - } - - false -} diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index 4d0a1c4d3a..2a4c558cd3 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -15,10 +15,12 @@ use crate::{ PostId, UserId, }; -use actix_web::{client::Client, web}; +use actix_web::web; use anyhow::Context as acontext; +use background_jobs::QueueHandle; use lemmy_db::naive_now; use lemmy_utils::location_info; +use reqwest::Client; /// Chat server sends this messages to session #[derive(Message)] @@ -181,6 +183,8 @@ pub struct ChatServer { /// An HTTP Client client: Client, + + activity_queue: QueueHandle, } impl ChatServer { @@ -188,6 +192,7 @@ impl ChatServer { pool: Pool>, rate_limiter: RateLimit, client: Client, + activity_queue: QueueHandle, ) -> ChatServer { ChatServer { sessions: HashMap::new(), @@ -199,6 +204,7 @@ impl ChatServer { rate_limiter, captchas: Vec::new(), client, + activity_queue, } } @@ -460,6 +466,7 @@ impl ChatServer { }; let client = self.client.clone(); + let activity_queue = self.activity_queue.clone(); async move { let msg = msg; let json: Value = serde_json::from_str(&msg.msg)?; @@ -474,6 +481,7 @@ impl ChatServer { pool, chat_server: addr, client, + activity_queue, }; let args = Args { context, diff --git a/ui/src/api_tests/comment.spec.ts b/ui/src/api_tests/comment.spec.ts index 747ec91090..83526cf962 100644 --- a/ui/src/api_tests/comment.spec.ts +++ b/ui/src/api_tests/comment.spec.ts @@ -1,3 +1,4 @@ +jest.setTimeout(120000); import { alpha, beta, @@ -19,6 +20,7 @@ import { createCommunity, registerUser, API, + delay, } from './shared'; import { PostResponse } from 'lemmy-js-client'; @@ -30,6 +32,7 @@ beforeAll(async () => { await followBeta(alpha); await followBeta(gamma); let search = await searchForBetaCommunity(alpha); + await delay(10000); postRes = await createPost( alpha, search.communities.filter(c => c.local == false)[0].id @@ -47,6 +50,7 @@ test('Create a comment', async () => { expect(commentRes.comment.community_local).toBe(false); expect(commentRes.comment.creator_local).toBe(true); expect(commentRes.comment.score).toBe(1); + await delay(); // Make sure that comment is liked on beta let searchBeta = await searchComment(beta, commentRes.comment); @@ -64,12 +68,14 @@ test('Create a comment in a non-existent post', async () => { test('Update a comment', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); let updateCommentRes = await updateComment(alpha, commentRes.comment.id); expect(updateCommentRes.comment.content).toBe( 'A jest test federated comment update' ); expect(updateCommentRes.comment.community_local).toBe(false); expect(updateCommentRes.comment.creator_local).toBe(true); + await delay(); // Make sure that post is updated on beta let searchBeta = await searchComment(beta, commentRes.comment); @@ -79,23 +85,21 @@ test('Update a comment', async () => { test('Delete a comment', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); + let deleteCommentRes = await deleteComment( alpha, true, commentRes.comment.id ); expect(deleteCommentRes.comment.deleted).toBe(true); + await delay(); - // Make sure that comment is deleted on beta - // The search doesnt work below, because it returns a tombstone / http::gone - // let searchBeta = await searchComment(beta, commentRes.comment); - // console.log(searchBeta); - // let betaComment = searchBeta.comments[0]; - // Create a fake post, just to get the previous new post id - let createdBetaPostJustToGetId = await createPost(beta, 2); - let betaPost = await getPost(beta, createdBetaPostJustToGetId.post.id - 1); - let betaComment = betaPost.comments[0]; - expect(betaComment.deleted).toBe(true); + // Make sure that comment is undefined on beta + let searchBeta = await searchComment(beta, commentRes.comment); + let betaComment = searchBeta.comments[0]; + expect(betaComment).toBeUndefined(); + await delay(); let undeleteCommentRes = await deleteComment( alpha, @@ -103,6 +107,7 @@ test('Delete a comment', async () => { commentRes.comment.id ); expect(undeleteCommentRes.comment.deleted).toBe(false); + await delay(); // Make sure that comment is undeleted on beta let searchBeta2 = await searchComment(beta, commentRes.comment); @@ -112,6 +117,7 @@ test('Delete a comment', async () => { test('Remove a comment from admin and community on the same instance', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); // Get the id for beta let betaCommentId = (await searchComment(beta, commentRes.comment)) @@ -120,6 +126,7 @@ test('Remove a comment from admin and community on the same instance', async () // The beta admin removes it (the community lives on beta) let removeCommentRes = await removeComment(beta, true, betaCommentId); expect(removeCommentRes.comment.removed).toBe(true); + await delay(); // Make sure that comment is removed on alpha (it gets pushed since an admin from beta removed it) let refetchedPost = await getPost(alpha, postRes.post.id); @@ -127,6 +134,7 @@ test('Remove a comment from admin and community on the same instance', async () let unremoveCommentRes = await removeComment(beta, false, betaCommentId); expect(unremoveCommentRes.comment.removed).toBe(false); + await delay(); // Make sure that comment is unremoved on beta let refetchedPost2 = await getPost(alpha, postRes.post.id); @@ -142,15 +150,19 @@ test('Remove a comment from admin and community on different instance', async () // New alpha user creates a community, post, and comment. let newCommunity = await createCommunity(newAlphaApi); + await delay(); let newPost = await createPost(newAlphaApi, newCommunity.community.id); + await delay(); let commentRes = await createComment(newAlphaApi, newPost.post.id); expect(commentRes.comment.content).toBeDefined(); + await delay(); // Beta searches that to cache it, then removes it let searchBeta = await searchComment(beta, commentRes.comment); let betaComment = searchBeta.comments[0]; let removeCommentRes = await removeComment(beta, true, betaComment.id); expect(removeCommentRes.comment.removed).toBe(true); + await delay(); // Make sure its not removed on alpha let refetchedPost = await getPost(newAlphaApi, newPost.post.id); @@ -159,8 +171,10 @@ test('Remove a comment from admin and community on different instance', async () test('Unlike a comment', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); let unlike = await likeComment(alpha, 0, commentRes.comment); expect(unlike.comment.score).toBe(0); + await delay(); // Make sure that post is unliked on beta let searchBeta = await searchComment(beta, commentRes.comment); @@ -173,6 +187,7 @@ test('Unlike a comment', async () => { test('Federated comment like', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); // Find the comment on beta let searchBeta = await searchComment(beta, commentRes.comment); @@ -180,6 +195,7 @@ test('Federated comment like', async () => { let like = await likeComment(beta, 1, betaComment); expect(like.comment.score).toBe(2); + await delay(); // Get the post from alpha, check the likes let post = await getPost(alpha, postRes.post.id); @@ -189,6 +205,7 @@ test('Federated comment like', async () => { test('Reply to a comment', async () => { // Create a comment on alpha, find it on beta let commentRes = await createComment(alpha, postRes.post.id); + await delay(); let searchBeta = await searchComment(beta, commentRes.comment); let betaComment = searchBeta.comments[0]; @@ -201,6 +218,7 @@ test('Reply to a comment', async () => { expect(replyRes.comment.creator_local).toBe(true); expect(replyRes.comment.parent_id).toBe(betaComment.id); expect(replyRes.comment.score).toBe(1); + await delay(); // Make sure that comment is seen on alpha // TODO not sure why, but a searchComment back to alpha, for the ap_id of betas @@ -219,6 +237,7 @@ test('Mention beta', async () => { // Create a mention on alpha let mentionContent = 'A test mention of @lemmy_beta@lemmy-beta:8550'; let commentRes = await createComment(alpha, postRes.post.id); + await delay(); let mentionRes = await createComment( alpha, postRes.post.id, @@ -229,6 +248,7 @@ test('Mention beta', async () => { expect(mentionRes.comment.community_local).toBe(false); expect(mentionRes.comment.creator_local).toBe(true); expect(mentionRes.comment.score).toBe(1); + await delay(); let mentionsRes = await getMentions(beta); expect(mentionsRes.mentions[0].content).toBeDefined(); @@ -239,6 +259,7 @@ test('Mention beta', async () => { test('Comment Search', async () => { let commentRes = await createComment(alpha, postRes.post.id); + await delay(); let searchBeta = await searchComment(beta, commentRes.comment); expect(searchBeta.comments[0].ap_id).toBe(commentRes.comment.ap_id); }); @@ -247,6 +268,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t // Create a local post let alphaPost = await createPost(alpha, 2); expect(alphaPost.post.community_local).toBe(true); + await delay(); // Make sure gamma sees it let search = await searchPost(gamma, alphaPost.post); @@ -264,6 +286,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t expect(commentRes.comment.community_local).toBe(false); expect(commentRes.comment.creator_local).toBe(true); expect(commentRes.comment.score).toBe(1); + await delay(); // Make sure alpha sees it let alphaPost2 = await getPost(alpha, alphaPost.post.id); @@ -291,6 +314,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde // B creates a post, and two comments, should be invisible to A let postRes = await createPost(beta, 2); expect(postRes.post.name).toBeDefined(); + await delay(); let parentCommentContent = 'An invisible top level comment from beta'; let parentCommentRes = await createComment( @@ -300,6 +324,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde parentCommentContent ); expect(parentCommentRes.comment.content).toBe(parentCommentContent); + await delay(); // B creates a comment, then a child one of that. let childCommentContent = 'An invisible child comment from beta'; @@ -310,11 +335,13 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde childCommentContent ); expect(childCommentRes.comment.content).toBe(childCommentContent); + await delay(); // Follow beta again let follow = await followBeta(alpha); expect(follow.community.local).toBe(false); expect(follow.community.name).toBe('main'); + await delay(); // An update to the child comment on beta, should push the post, parent, and child to alpha now let updatedCommentContent = 'An update child comment from beta'; @@ -324,10 +351,14 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde updatedCommentContent ); expect(updateRes.comment.content).toBe(updatedCommentContent); + await delay(); // Get the post from alpha - let createFakeAlphaPostToGetId = await createPost(alpha, 2); - let alphaPost = await getPost(alpha, createFakeAlphaPostToGetId.post.id - 1); + let search = await searchPost(alpha, postRes.post); + let alphaPostB = search.posts[0]; + await delay(); + + let alphaPost = await getPost(alpha, alphaPostB.id); expect(alphaPost.post.name).toBeDefined(); expect(alphaPost.comments[1].content).toBe(parentCommentContent); expect(alphaPost.comments[0].content).toBe(updatedCommentContent); diff --git a/ui/src/api_tests/community.spec.ts b/ui/src/api_tests/community.spec.ts index 6945e33239..bd498009a9 100644 --- a/ui/src/api_tests/community.spec.ts +++ b/ui/src/api_tests/community.spec.ts @@ -6,6 +6,7 @@ import { createCommunity, deleteCommunity, removeCommunity, + delay, } from './shared'; beforeAll(async () => { @@ -24,12 +25,14 @@ test('Create community', async () => { test('Delete community', async () => { let communityRes = await createCommunity(beta); + await delay(); let deleteCommunityRes = await deleteCommunity( beta, true, communityRes.community.id ); expect(deleteCommunityRes.community.deleted).toBe(true); + await delay(); // Make sure it got deleted on A let search = await searchForBetaCommunity(alpha); @@ -44,6 +47,7 @@ test('Delete community', async () => { communityRes.community.id ); expect(undeleteCommunityRes.community.deleted).toBe(false); + await delay(); // Make sure it got undeleted on A let search2 = await searchForBetaCommunity(alpha); @@ -54,6 +58,7 @@ test('Delete community', async () => { test('Remove community', async () => { let communityRes = await createCommunity(beta); + await delay(); let removeCommunityRes = await removeCommunity( beta, true, @@ -66,6 +71,7 @@ test('Remove community', async () => { let communityA = search.communities[0]; // TODO this fails currently, because no updates are pushed // expect(communityA.removed).toBe(true); + await delay(); // unremove let unremoveCommunityRes = await removeCommunity( @@ -74,6 +80,7 @@ test('Remove community', async () => { communityRes.community.id ); expect(unremoveCommunityRes.community.removed).toBe(false); + await delay(); // Make sure it got unremoved on A let search2 = await searchForBetaCommunity(alpha); diff --git a/ui/src/api_tests/follow.spec.ts b/ui/src/api_tests/follow.spec.ts index 2f1f8cd89c..41af6effb4 100644 --- a/ui/src/api_tests/follow.spec.ts +++ b/ui/src/api_tests/follow.spec.ts @@ -5,6 +5,7 @@ import { followCommunity, checkFollowedCommunities, unfollowRemotes, + delay, } from './shared'; beforeAll(async () => { @@ -22,6 +23,7 @@ test('Follow federated community', async () => { // Make sure the follow response went through expect(follow.community.local).toBe(false); expect(follow.community.name).toBe('main'); + await delay(); // Check it from local let followCheck = await checkFollowedCommunities(alpha); @@ -33,6 +35,7 @@ test('Follow federated community', async () => { // Test an unfollow let unfollow = await followCommunity(alpha, false, remoteCommunityId); expect(unfollow.community.local).toBe(false); + await delay(); // Make sure you are unsubbed locally let unfollowCheck = await checkFollowedCommunities(alpha); diff --git a/ui/src/api_tests/post.spec.ts b/ui/src/api_tests/post.spec.ts index ab9c63fb16..c2cbad6d57 100644 --- a/ui/src/api_tests/post.spec.ts +++ b/ui/src/api_tests/post.spec.ts @@ -1,3 +1,4 @@ +jest.setTimeout(120000); import { alpha, beta, @@ -18,6 +19,7 @@ import { removePost, getPost, unfollowRemotes, + delay, } from './shared'; beforeAll(async () => { @@ -26,6 +28,7 @@ beforeAll(async () => { await followBeta(gamma); await followBeta(delta); await followBeta(epsilon); + await delay(10000); }); afterAll(async () => { @@ -37,11 +40,13 @@ afterAll(async () => { test('Create a post', async () => { let search = await searchForBetaCommunity(alpha); + await delay(); let postRes = await createPost(alpha, search.communities[0].id); expect(postRes.post).toBeDefined(); expect(postRes.post.community_local).toBe(false); expect(postRes.post.creator_local).toBe(true); expect(postRes.post.score).toBe(1); + await delay(); // Make sure that post is liked on beta let searchBeta = await searchPost(beta, postRes.post); @@ -69,12 +74,15 @@ test('Create a post in a non-existent community', async () => { test('Unlike a post', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let unlike = await likePost(alpha, 0, postRes.post); expect(unlike.post.score).toBe(0); + await delay(); // Try to unlike it again, make sure it stays at 0 let unlike2 = await likePost(alpha, 0, postRes.post); expect(unlike2.post.score).toBe(0); + await delay(); // Make sure that post is unliked on beta let searchBeta = await searchPost(beta, postRes.post); @@ -89,12 +97,14 @@ test('Unlike a post', async () => { test('Update a post', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let updatedName = 'A jest test federated post, updated'; let updatedPost = await updatePost(alpha, postRes.post); expect(updatedPost.post.name).toBe(updatedName); expect(updatedPost.post.community_local).toBe(false); expect(updatedPost.post.creator_local).toBe(true); + await delay(); // Make sure that post is updated on beta let searchBeta = await searchPost(beta, postRes.post); @@ -102,6 +112,7 @@ test('Update a post', async () => { expect(betaPost.community_local).toBe(true); expect(betaPost.creator_local).toBe(false); expect(betaPost.name).toBe(updatedName); + await delay(); // Make sure lemmy beta cannot update the post let updatedPostBeta = await updatePost(beta, betaPost); @@ -111,9 +122,11 @@ test('Update a post', async () => { test('Sticky a post', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let stickiedPostRes = await stickyPost(alpha, true, postRes.post); expect(stickiedPostRes.post.stickied).toBe(true); + await delay(); // Make sure that post is stickied on beta let searchBeta = await searchPost(beta, postRes.post); @@ -125,6 +138,7 @@ test('Sticky a post', async () => { // Unsticky a post let unstickiedPost = await stickyPost(alpha, false, postRes.post); expect(unstickiedPost.post.stickied).toBe(false); + await delay(); // Make sure that post is unstickied on beta let searchBeta2 = await searchPost(beta, postRes.post); @@ -137,6 +151,7 @@ test('Sticky a post', async () => { let searchGamma = await searchPost(gamma, postRes.post); let gammaPost = searchGamma.posts[0]; let gammaTrySticky = await stickyPost(gamma, true, gammaPost); + await delay(); let searchBeta3 = await searchPost(beta, postRes.post); let betaPost3 = searchBeta3.posts[0]; expect(gammaTrySticky.post.stickied).toBe(true); @@ -145,10 +160,13 @@ test('Sticky a post', async () => { test('Lock a post', async () => { let search = await searchForBetaCommunity(alpha); + await delay(); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let lockedPostRes = await lockPost(alpha, true, postRes.post); expect(lockedPostRes.post.locked).toBe(true); + await delay(); // Make sure that post is locked on beta let searchBeta = await searchPost(beta, postRes.post); @@ -160,14 +178,17 @@ test('Lock a post', async () => { // Try to make a new comment there, on alpha let comment = await createComment(alpha, postRes.post.id); expect(comment['error']).toBe('locked'); + await delay(); // Try to create a new comment, on beta let commentBeta = await createComment(beta, betaPost.id); expect(commentBeta['error']).toBe('locked'); + await delay(); // Unlock a post let unlockedPost = await lockPost(alpha, false, postRes.post); expect(unlockedPost.post.locked).toBe(false); + await delay(); // Make sure that post is unlocked on beta let searchBeta2 = await searchPost(beta, postRes.post); @@ -180,68 +201,84 @@ test('Lock a post', async () => { test('Delete a post', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let deletedPost = await deletePost(alpha, true, postRes.post); expect(deletedPost.post.deleted).toBe(true); + await delay(); // Make sure lemmy beta sees post is deleted - let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1; - let betaPost = await getPost(beta, createFakeBetaPostToGetId); - expect(betaPost.post.deleted).toBe(true); + let searchBeta = await searchPost(beta, postRes.post); + let betaPost = searchBeta.posts[0]; + // This will be undefined because of the tombstone + expect(betaPost).toBeUndefined(); + await delay(); // Undelete let undeletedPost = await deletePost(alpha, false, postRes.post); expect(undeletedPost.post.deleted).toBe(false); + await delay(); // Make sure lemmy beta sees post is undeleted - let betaPost2 = await getPost(beta, createFakeBetaPostToGetId); - expect(betaPost2.post.deleted).toBe(false); + let searchBeta2 = await searchPost(beta, postRes.post); + let betaPost2 = searchBeta2.posts[0]; + expect(betaPost2.deleted).toBe(false); // Make sure lemmy beta cannot delete the post - let deletedPostBeta = await deletePost(beta, true, betaPost2.post); + let deletedPostBeta = await deletePost(beta, true, betaPost2); expect(deletedPostBeta).toStrictEqual({ error: 'no_post_edit_allowed' }); }); test('Remove a post from admin and community on different instance', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let removedPost = await removePost(alpha, true, postRes.post); expect(removedPost.post.removed).toBe(true); + await delay(); // Make sure lemmy beta sees post is NOT removed - let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1; - let betaPost = await getPost(beta, createFakeBetaPostToGetId); - expect(betaPost.post.removed).toBe(false); + let searchBeta = await searchPost(beta, postRes.post); + let betaPost = searchBeta.posts[0]; + expect(betaPost.removed).toBe(false); + await delay(); // Undelete let undeletedPost = await removePost(alpha, false, postRes.post); expect(undeletedPost.post.removed).toBe(false); + await delay(); // Make sure lemmy beta sees post is undeleted - let betaPost2 = await getPost(beta, createFakeBetaPostToGetId); - expect(betaPost2.post.removed).toBe(false); + let searchBeta2 = await searchPost(beta, postRes.post); + let betaPost2 = searchBeta2.posts[0]; + expect(betaPost2.removed).toBe(false); }); test('Remove a post from admin and community on same instance', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); // Get the id for beta - let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1; - let betaPost = await getPost(beta, createFakeBetaPostToGetId); + let searchBeta = await searchPost(beta, postRes.post); + let betaPost = searchBeta.posts[0]; + await delay(); // The beta admin removes it (the community lives on beta) - let removePostRes = await removePost(beta, true, betaPost.post); + let removePostRes = await removePost(beta, true, betaPost); expect(removePostRes.post.removed).toBe(true); + await delay(); // Make sure lemmy alpha sees post is removed let alphaPost = await getPost(alpha, postRes.post.id); expect(alphaPost.post.removed).toBe(true); + await delay(); // Undelete - let undeletedPost = await removePost(beta, false, betaPost.post); + let undeletedPost = await removePost(beta, false, betaPost); expect(undeletedPost.post.removed).toBe(false); + await delay(); // Make sure lemmy alpha sees post is undeleted let alphaPost2 = await getPost(alpha, postRes.post.id); @@ -250,7 +287,9 @@ test('Remove a post from admin and community on same instance', async () => { test('Search for a post', async () => { let search = await searchForBetaCommunity(alpha); + await delay(); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let searchBeta = await searchPost(beta, postRes.post); expect(searchBeta.posts[0].name).toBeDefined(); @@ -259,6 +298,7 @@ test('Search for a post', async () => { test('A and G subscribe to B (center) A posts, it gets announced to G', async () => { let search = await searchForBetaCommunity(alpha); let postRes = await createPost(alpha, search.communities[0].id); + await delay(); let search2 = await searchPost(gamma, postRes.post); expect(search2.posts[0].name).toBeDefined(); diff --git a/ui/src/api_tests/private_message.spec.ts b/ui/src/api_tests/private_message.spec.ts index 4bf3f07a2b..78d467da98 100644 --- a/ui/src/api_tests/private_message.spec.ts +++ b/ui/src/api_tests/private_message.spec.ts @@ -8,13 +8,16 @@ import { listPrivateMessages, deletePrivateMessage, unfollowRemotes, + delay, } from './shared'; let recipient_id: number; beforeAll(async () => { await setupLogins(); - recipient_id = (await followBeta(alpha)).community.creator_id; + let follow = await followBeta(alpha); + await delay(10000); + recipient_id = follow.community.creator_id; }); afterAll(async () => { @@ -27,6 +30,7 @@ test('Create a private message', async () => { expect(pmRes.message.local).toBe(true); expect(pmRes.message.creator_local).toBe(true); expect(pmRes.message.recipient_local).toBe(false); + await delay(); let betaPms = await listPrivateMessages(beta); expect(betaPms.messages[0].content).toBeDefined(); @@ -41,6 +45,7 @@ test('Update a private message', async () => { let pmRes = await createPrivateMessage(alpha, recipient_id); let pmUpdated = await updatePrivateMessage(alpha, pmRes.message.id); expect(pmUpdated.message.content).toBe(updatedContent); + await delay(); let betaPms = await listPrivateMessages(beta); expect(betaPms.messages[0].content).toBe(updatedContent); @@ -48,15 +53,18 @@ test('Update a private message', async () => { test('Delete a private message', async () => { let pmRes = await createPrivateMessage(alpha, recipient_id); + await delay(); let betaPms1 = await listPrivateMessages(beta); let deletedPmRes = await deletePrivateMessage(alpha, true, pmRes.message.id); expect(deletedPmRes.message.deleted).toBe(true); + await delay(); // The GetPrivateMessages filters out deleted, // even though they are in the actual database. // no reason to show them let betaPms2 = await listPrivateMessages(beta); expect(betaPms2.messages.length).toBe(betaPms1.messages.length - 1); + await delay(); // Undelete let undeletedPmRes = await deletePrivateMessage( @@ -65,6 +73,7 @@ test('Delete a private message', async () => { pmRes.message.id ); expect(undeletedPmRes.message.deleted).toBe(false); + await delay(); let betaPms3 = await listPrivateMessages(beta); expect(betaPms3.messages.length).toBe(betaPms1.messages.length); diff --git a/ui/src/api_tests/shared.ts b/ui/src/api_tests/shared.ts index 710671c0e0..eb4c6da03a 100644 --- a/ui/src/api_tests/shared.ts +++ b/ui/src/api_tests/shared.ts @@ -198,7 +198,7 @@ export async function searchPost( ): Promise { let form: SearchForm = { q: post.ap_id, - type_: SearchType.All, + type_: SearchType.Posts, sort: SortType.TopAll, }; return api.client.search(form); @@ -220,7 +220,7 @@ export async function searchComment( ): Promise { let form: SearchForm = { q: comment.ap_id, - type_: SearchType.All, + type_: SearchType.Comments, sort: SortType.TopAll, }; return api.client.search(form); @@ -233,7 +233,7 @@ export async function searchForBetaCommunity( // Use short-hand search url let form: SearchForm = { q: '!main@lemmy-beta:8550', - type_: SearchType.All, + type_: SearchType.Communities, sort: SortType.TopAll, }; return api.client.search(form); @@ -247,7 +247,7 @@ export async function searchForUser( // Use short-hand search url let form: SearchForm = { q: apShortname, - type_: SearchType.All, + type_: SearchType.Users, sort: SortType.TopAll, }; return api.client.search(form); @@ -524,6 +524,11 @@ export async function followBeta(api: API): Promise { } } +export const delay = (millis: number = 1500) => + new Promise((resolve, _reject) => { + setTimeout(_ => resolve(), millis); + }); + export function wrapper(form: any): string { return JSON.stringify(form); }