diff --git a/Cargo.lock b/Cargo.lock index 4224442c8..569371449 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,11 +250,9 @@ dependencies = [ "pin-project-lite", "rustls-pki-types", "tokio", - "tokio-rustls 0.23.4", "tokio-rustls 0.26.0", "tokio-util", "tracing", - "webpki-roots 0.22.6", ] [[package]] @@ -2711,7 +2709,7 @@ dependencies = [ "base64 0.21.7", "js-sys", "pem", - "ring 0.17.8", + "ring", "serde", "serde_json", "simple_asn1", @@ -2737,7 +2735,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lemmy_api" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2766,7 +2764,7 @@ dependencies = [ [[package]] name = "lemmy_api_common" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2804,7 +2802,7 @@ dependencies = [ [[package]] name = "lemmy_api_crud" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "accept-language", "activitypub_federation", @@ -2827,7 +2825,7 @@ dependencies = [ [[package]] name = "lemmy_apub" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2865,7 +2863,7 @@ dependencies = [ [[package]] name = "lemmy_db_perf" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "anyhow", "clap", @@ -2880,7 +2878,7 @@ dependencies = [ [[package]] name = "lemmy_db_schema" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "anyhow", @@ -2920,7 +2918,7 @@ dependencies = [ [[package]] name = "lemmy_db_views" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "actix-web", "chrono", @@ -2942,12 +2940,13 @@ dependencies = [ [[package]] name = "lemmy_db_views_actor" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "chrono", "diesel", "diesel-async", "lemmy_db_schema", + "lemmy_db_views", "lemmy_utils", "pretty_assertions", "serde", @@ -2962,7 +2961,7 @@ dependencies = [ [[package]] name = "lemmy_db_views_moderator" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "diesel", "diesel-async", @@ -2974,7 +2973,7 @@ dependencies = [ [[package]] name = "lemmy_federate" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -3001,7 +3000,7 @@ dependencies = [ [[package]] name = "lemmy_routes" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -3026,7 +3025,7 @@ dependencies = [ [[package]] name = "lemmy_server" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-cors", @@ -3069,7 +3068,7 @@ dependencies = [ [[package]] name = "lemmy_utils" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "actix-web", "anyhow", @@ -4076,7 +4075,7 @@ dependencies = [ "tracing-subscriber", "url", "uuid", - "webpki-roots 0.26.1", + "webpki-roots", ] [[package]] @@ -4678,7 +4677,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.1", + "webpki-roots", "winreg 0.52.0", ] @@ -4767,21 +4766,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9b1a3d5f46d53f4a3478e2be4a5a5ce5108ea58b100dcd139830eae7f79a3a1" -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin 0.5.2", - "untrusted 0.7.1", - "web-sys", - "winapi", -] - [[package]] name = "ring" version = "0.17.8" @@ -4792,7 +4776,7 @@ dependencies = [ "cfg-if", "getrandom", "libc", - "spin 0.9.8", + "spin", "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -4875,18 +4859,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.22.4" @@ -4894,7 +4866,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", - "ring 0.17.8", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -4910,7 +4882,7 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", - "ring 0.17.8", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -4959,7 +4931,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "aws-lc-rs", - "ring 0.17.8", + "ring", "rustls-pki-types", "untrusted 0.9.0", ] @@ -5038,16 +5010,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring 0.17.8", - "untrusted 0.9.0", -] - [[package]] name = "sdd" version = "0.2.0" @@ -5373,12 +5335,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.8" @@ -5822,7 +5778,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab" dependencies = [ - "ring 0.17.8", + "ring", "rustls 0.23.8", "tokio", "tokio-postgres", @@ -5830,17 +5786,6 @@ dependencies = [ "x509-certificate", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.25.0" @@ -6628,25 +6573,6 @@ dependencies = [ "url", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.8", - "untrusted 0.9.0", -] - -[[package]] -name = "webpki-roots" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] - [[package]] name = "webpki-roots" version = "0.26.1" @@ -6930,7 +6856,7 @@ dependencies = [ "der", "hex", "pem", - "ring 0.17.8", + "ring", "signature", "spki", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 21ca80069..16940d3ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" edition = "2021" description = "A link aggregator for the fediverse" license = "AGPL-3.0" @@ -88,17 +88,17 @@ unused_self = "deny" unwrap_used = "deny" [workspace.dependencies] -lemmy_api = { version = "=0.19.4-rc.3", path = "./crates/api" } -lemmy_api_crud = { version = "=0.19.4-rc.3", path = "./crates/api_crud" } -lemmy_apub = { version = "=0.19.4-rc.3", path = "./crates/apub" } -lemmy_utils = { version = "=0.19.4-rc.3", path = "./crates/utils", default-features = false } -lemmy_db_schema = { version = "=0.19.4-rc.3", path = "./crates/db_schema" } -lemmy_api_common = { version = "=0.19.4-rc.3", path = "./crates/api_common" } -lemmy_routes = { version = "=0.19.4-rc.3", path = "./crates/routes" } -lemmy_db_views = { version = "=0.19.4-rc.3", path = "./crates/db_views" } -lemmy_db_views_actor = { version = "=0.19.4-rc.3", path = "./crates/db_views_actor" } -lemmy_db_views_moderator = { version = "=0.19.4-rc.3", path = "./crates/db_views_moderator" } -lemmy_federate = { version = "=0.19.4-rc.3", path = "./crates/federate" } +lemmy_api = { version = "=0.19.4-rc.4", path = "./crates/api" } +lemmy_api_crud = { version = "=0.19.4-rc.4", path = "./crates/api_crud" } +lemmy_apub = { version = "=0.19.4-rc.4", path = "./crates/apub" } +lemmy_utils = { version = "=0.19.4-rc.4", path = "./crates/utils", default-features = false } +lemmy_db_schema = { version = "=0.19.4-rc.4", path = "./crates/db_schema" } +lemmy_api_common = { version = "=0.19.4-rc.4", path = "./crates/api_common" } +lemmy_routes = { version = "=0.19.4-rc.4", path = "./crates/routes" } +lemmy_db_views = { version = "=0.19.4-rc.4", path = "./crates/db_views" } +lemmy_db_views_actor = { version = "=0.19.4-rc.4", path = "./crates/db_views_actor" } +lemmy_db_views_moderator = { version = "=0.19.4-rc.4", path = "./crates/db_views_moderator" } +lemmy_federate = { version = "=0.19.4-rc.4", path = "./crates/federate" } activitypub_federation = { version = "0.5.6", default-features = false, features = [ "actix-web", ] } @@ -109,7 +109,7 @@ serde = { version = "1.0.202", features = ["derive"] } serde_with = "3.8.1" actix-web = { version = "4.6.0", default-features = false, features = [ "macros", - "rustls", + "rustls-0_23", "compress-brotli", "compress-gzip", "compress-zstd", diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index dfab4109c..8c3a23ab5 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -37,8 +37,9 @@ import { followCommunity, blockCommunity, delay, + saveUserSettings, } from "./shared"; -import { CommentView, CommunityView } from "lemmy-js-client"; +import { CommentView, CommunityView, SaveUserSettings } from "lemmy-js-client"; let betaCommunity: CommunityView | undefined; let postOnAlphaRes: PostResponse; @@ -443,6 +444,59 @@ test("Reply to a comment from another instance, get notification", async () => { assertCommentFederation(alphaReply, replyRes.comment_view); }); +test("Bot reply notifications are filtered when bots are hidden", async () => { + const newAlphaBot = await registerUser(alpha, alphaUrl); + let form: SaveUserSettings = { + bot_account: true, + }; + await saveUserSettings(newAlphaBot, form); + + const alphaCommunity = ( + await resolveCommunity(alpha, "!main@lemmy-alpha:8541") + ).community; + + if (!alphaCommunity) { + throw "Missing alpha community"; + } + + await alpha.markAllAsRead(); + form = { + show_bot_accounts: false, + }; + await saveUserSettings(alpha, form); + const postOnAlphaRes = await createPost(alpha, alphaCommunity.community.id); + + // Bot reply to alpha's post + let commentRes = await createComment( + newAlphaBot, + postOnAlphaRes.post_view.post.id, + ); + expect(commentRes).toBeDefined(); + + let alphaUnreadCountRes = await getUnreadCount(alpha); + expect(alphaUnreadCountRes.replies).toBe(0); + + let alphaUnreadRepliesRes = await getReplies(alpha, true); + expect(alphaUnreadRepliesRes.replies.length).toBe(0); + + // This both restores the original state that may be expected by other tests + // implicitly and is used by the next steps to ensure replies are still + // returned when a user later decides to show bot accounts again. + form = { + show_bot_accounts: true, + }; + await saveUserSettings(alpha, form); + + alphaUnreadCountRes = await getUnreadCount(alpha); + expect(alphaUnreadCountRes.replies).toBe(1); + + alphaUnreadRepliesRes = await getReplies(alpha, true); + expect(alphaUnreadRepliesRes.replies.length).toBe(1); + expect(alphaUnreadRepliesRes.replies[0].comment.id).toBe( + commentRes.comment_view.comment.id, + ); +}); + test("Mention beta from alpha", async () => { if (!betaCommunity) throw Error("no community"); const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 056f25538..2ae3d9e21 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -364,10 +364,13 @@ export async function getUnreadCount( return api.getUnreadCount(); } -export async function getReplies(api: LemmyHttp): Promise { +export async function getReplies( + api: LemmyHttp, + unread_only: boolean = false, +): Promise { let form: GetReplies = { sort: "New", - unread_only: false, + unread_only, }; return api.getReplies(form); } diff --git a/crates/api/src/local_user/notifications/unread_count.rs b/crates/api/src/local_user/notifications/unread_count.rs index 9d06f7c62..4c6c65263 100644 --- a/crates/api/src/local_user/notifications/unread_count.rs +++ b/crates/api/src/local_user/notifications/unread_count.rs @@ -11,9 +11,12 @@ pub async fn unread_count( ) -> LemmyResult> { let person_id = local_user_view.person.id; - let replies = CommentReplyView::get_unread_replies(&mut context.pool(), person_id).await?; + let replies = + CommentReplyView::get_unread_replies(&mut context.pool(), &local_user_view.local_user).await?; - let mentions = PersonMentionView::get_unread_mentions(&mut context.pool(), person_id).await?; + let mentions = + PersonMentionView::get_unread_mentions(&mut context.pool(), &local_user_view.local_user) + .await?; let private_messages = PrivateMessageView::get_unread_messages(&mut context.pool(), person_id).await?; diff --git a/crates/db_views/src/comment_view.rs b/crates/db_views/src/comment_view.rs index 7588943b9..e021578f8 100644 --- a/crates/db_views/src/comment_view.rs +++ b/crates/db_views/src/comment_view.rs @@ -1,5 +1,4 @@ use crate::structs::{CommentView, LocalUserView}; -use chrono::{DateTime, Utc}; use diesel::{ dsl::{exists, not}, pg::Pg, @@ -63,17 +62,6 @@ fn queries<'a>() -> Queries< ) }; - let is_saved = |person_id| { - comment_saved::table - .filter( - comment::id - .eq(comment_saved::comment_id) - .and(comment_saved::person_id.eq(person_id)), - ) - .select(comment_saved::published.nullable()) - .single_value() - }; - let is_community_followed = |person_id| { community_follower::table .filter( @@ -147,14 +135,6 @@ fn queries<'a>() -> Queries< Box::new(None::.into_sql::>()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_creator_blocked_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_creator_blocked(person_id)) @@ -167,6 +147,13 @@ fn queries<'a>() -> Queries< .inner_join(post::table) .inner_join(community::table.on(post::community_id.eq(community::id))) .inner_join(comment_aggregates::table) + .left_join( + comment_saved::table.on( + comment::id + .eq(comment_saved::comment_id) + .and(comment_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( comment::all_columns, person::all_columns, @@ -178,7 +165,7 @@ fn queries<'a>() -> Queries< creator_is_moderator, creator_is_admin, subscribed_type_selection, - is_saved_selection.is_not_null(), + comment_saved::person_id.nullable().is_not_null(), is_creator_blocked_selection, score_selection, )) @@ -260,8 +247,8 @@ fn queries<'a>() -> Queries< // If its saved only, then filter, and order by the saved time, not the comment creation time. if options.saved_only { query = query - .filter(is_saved(person_id_join).is_not_null()) - .then_order_by(is_saved(person_id_join).desc()); + .filter(comment_saved::person_id.is_not_null()) + .then_order_by(comment_saved::published.desc()); } if let Some(my_id) = my_person_id { diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index afb0f435f..eac44bb39 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -1,5 +1,4 @@ use crate::structs::{LocalUserView, PaginationCursor, PostView}; -use chrono::{DateTime, Utc}; use diesel::{ debug_query, dsl::{exists, not, IntervalDsl}, @@ -100,17 +99,6 @@ fn queries<'a>() -> Queries< ), ); - let is_saved = |person_id| { - post_saved::table - .filter( - post_aggregates::post_id - .eq(post_saved::post_id) - .and(post_saved::person_id.eq(person_id)), - ) - .select(post_saved::published.nullable()) - .single_value() - }; - let is_read = |person_id| { exists( post_read::table.filter( @@ -162,14 +150,6 @@ fn queries<'a>() -> Queries< Box::new(false.into_sql::()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_read_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_read(person_id)) @@ -237,6 +217,13 @@ fn queries<'a>() -> Queries< .inner_join(person::table) .inner_join(community::table) .inner_join(post::table) + .left_join( + post_saved::table.on( + post_aggregates::post_id + .eq(post_saved::post_id) + .and(post_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( post::all_columns, person::all_columns, @@ -247,7 +234,7 @@ fn queries<'a>() -> Queries< creator_is_admin, post_aggregates::all_columns, subscribed_type_selection, - is_saved_selection.is_not_null(), + post_saved::person_id.nullable().is_not_null(), is_read_selection, is_hidden_selection, is_creator_blocked_selection, @@ -426,10 +413,10 @@ fn queries<'a>() -> Queries< }; // If its saved only, then filter, and order by the saved time, not the comment creation time. - if let (true, Some(person_id)) = (options.saved_only, my_person_id) { + if let (true, Some(_person_id)) = (options.saved_only, my_person_id) { query = query - .filter(is_saved(person_id).is_not_null()) - .then_order_by(is_saved(person_id).desc()); + .filter(post_saved::person_id.is_not_null()) + .then_order_by(post_saved::published.desc()); } // Only hide the read posts, if the saved_only is false. Otherwise ppl with the hide_read // setting wont be able to see saved posts. diff --git a/crates/db_views_actor/Cargo.toml b/crates/db_views_actor/Cargo.toml index 1892055d1..d9e6a3352 100644 --- a/crates/db_views_actor/Cargo.toml +++ b/crates/db_views_actor/Cargo.toml @@ -40,6 +40,7 @@ serial_test = { workspace = true } tokio = { workspace = true } pretty_assertions = { workspace = true } url.workspace = true +lemmy_db_views.workspace = true lemmy_utils.workspace = true [package.metadata.cargo-machete] diff --git a/crates/db_views_actor/src/comment_reply_view.rs b/crates/db_views_actor/src/comment_reply_view.rs index 547c00e53..a5939d2e9 100644 --- a/crates/db_views_actor/src/comment_reply_view.rs +++ b/crates/db_views_actor/src/comment_reply_view.rs @@ -31,6 +31,7 @@ use lemmy_db_schema::{ person_block, post, }, + source::local_user::LocalUser, utils::{get_conn, limit_and_offset, DbConn, DbPool, ListFn, Queries, ReadFn}, CommentSortType, }; @@ -193,6 +194,8 @@ fn queries<'a>() -> Queries< }; let list = move |mut conn: DbConn<'a>, options: CommentReplyQuery| async move { + // These filters need to be kept in sync with the filters in + // CommentReplyView::get_unread_replies() let mut query = all_joins(comment_reply::table.into_boxed(), options.my_person_id); if let Some(recipient_id) = options.recipient_id { @@ -204,7 +207,7 @@ fn queries<'a>() -> Queries< } if !options.show_bot_accounts { - query = query.filter(person::bot_account.eq(false)); + query = query.filter(not(person::bot_account)); }; query = match options.sort.unwrap_or(CommentSortType::New) { @@ -246,24 +249,33 @@ impl CommentReplyView { /// Gets the number of unread replies pub async fn get_unread_replies( pool: &mut DbPool<'_>, - my_person_id: PersonId, + local_user: &LocalUser, ) -> Result { use diesel::dsl::count; let conn = &mut get_conn(pool).await?; - comment_reply::table + let mut query = comment_reply::table .inner_join(comment::table) .left_join( person_block::table.on( comment::creator_id .eq(person_block::target_id) - .and(person_block::person_id.eq(my_person_id)), + .and(person_block::person_id.eq(local_user.person_id)), ), ) - // Dont count replies from blocked users + .inner_join(person::table.on(comment::creator_id.eq(person::id))) + .into_boxed(); + + // These filters need to be kept in sync with the filters in queries().list() + if !local_user.show_bot_accounts { + query = query.filter(not(person::bot_account)); + } + + query + // Don't count replies from blocked users .filter(person_block::person_id.is_null()) - .filter(comment_reply::recipient_id.eq(my_person_id)) + .filter(comment_reply::recipient_id.eq(local_user.person_id)) .filter(comment_reply::read.eq(false)) .filter(comment::deleted.eq(false)) .filter(comment::removed.eq(false)) @@ -301,13 +313,15 @@ mod tests { comment_reply::{CommentReply, CommentReplyInsertForm, CommentReplyUpdateForm}, community::{Community, CommunityInsertForm}, instance::Instance, - person::{Person, PersonInsertForm}, + local_user::{LocalUser, LocalUserInsertForm, LocalUserUpdateForm}, + person::{Person, PersonInsertForm, PersonUpdateForm}, person_block::{PersonBlock, PersonBlockForm}, post::{Post, PostInsertForm}, }, traits::{Blockable, Crud}, utils::build_db_pool_for_tests, }; + use lemmy_db_views::structs::LocalUserView; use lemmy_utils::{error::LemmyResult, LemmyErrorType}; use pretty_assertions::assert_eq; use serial_test::serial; @@ -331,11 +345,15 @@ mod tests { .name("terrylakes recipient".into()) .public_key("pubkey".to_string()) .instance_id(inserted_instance.id) + .local(Some(true)) .build(); let inserted_recipient = Person::create(pool, &recipient_form).await?; let recipient_id = inserted_recipient.id; + let recipient_local_user = + LocalUser::create(pool, &LocalUserInsertForm::test_form(recipient_id), vec![]).await?; + let new_community = CommunityInsertForm::builder() .name("test community lake".to_string()) .title("nada".to_owned()) @@ -386,7 +404,7 @@ mod tests { CommentReply::update(pool, inserted_reply.id, &comment_reply_update_form).await?; // Test to make sure counts and blocks work correctly - let unread_replies = CommentReplyView::get_unread_replies(pool, recipient_id).await?; + let unread_replies = CommentReplyView::get_unread_replies(pool, &recipient_local_user).await?; let query = CommentReplyQuery { recipient_id: Some(recipient_id), @@ -409,11 +427,44 @@ mod tests { PersonBlock::block(pool, &block_form).await?; let unread_replies_after_block = - CommentReplyView::get_unread_replies(pool, recipient_id).await?; - let replies_after_block = query.list(pool).await?; + CommentReplyView::get_unread_replies(pool, &recipient_local_user).await?; + let replies_after_block = query.clone().list(pool).await?; assert_eq!(0, unread_replies_after_block); assert_eq!(0, replies_after_block.len()); + // Unblock user so we can reuse the same person + PersonBlock::unblock(pool, &block_form).await?; + + // Turn Terry into a bot account + let person_update_form = PersonUpdateForm { + bot_account: Some(true), + ..Default::default() + }; + Person::update(pool, inserted_terry.id, &person_update_form).await?; + + let recipient_local_user_update_form = LocalUserUpdateForm { + show_bot_accounts: Some(false), + ..Default::default() + }; + LocalUser::update( + pool, + recipient_local_user.id, + &recipient_local_user_update_form, + ) + .await?; + let recipient_local_user_view = LocalUserView::read(pool, recipient_local_user.id) + .await? + .ok_or(LemmyErrorType::CouldntFindLocalUser)?; + + let unread_replies_after_hide_bots = + CommentReplyView::get_unread_replies(pool, &recipient_local_user_view.local_user).await?; + + let mut query_without_bots = query.clone(); + query_without_bots.show_bot_accounts = false; + let replies_after_hide_bots = query_without_bots.list(pool).await?; + assert_eq!(0, unread_replies_after_hide_bots); + assert_eq!(0, replies_after_hide_bots.len()); + Comment::delete(pool, inserted_comment.id).await?; Post::delete(pool, inserted_post.id).await?; Community::delete(pool, inserted_community.id).await?; diff --git a/crates/db_views_actor/src/person_mention_view.rs b/crates/db_views_actor/src/person_mention_view.rs index d42987a68..58ddb011b 100644 --- a/crates/db_views_actor/src/person_mention_view.rs +++ b/crates/db_views_actor/src/person_mention_view.rs @@ -31,6 +31,7 @@ use lemmy_db_schema::{ person_mention, post, }, + source::local_user::LocalUser, utils::{get_conn, limit_and_offset, DbConn, DbPool, ListFn, Queries, ReadFn}, CommentSortType, }; @@ -192,6 +193,8 @@ fn queries<'a>() -> Queries< }; let list = move |mut conn: DbConn<'a>, options: PersonMentionQuery| async move { + // These filters need to be kept in sync with the filters in + // PersonMentionView::get_unread_mentions() let mut query = all_joins(person_mention::table.into_boxed(), options.my_person_id); if let Some(recipient_id) = options.recipient_id { @@ -203,7 +206,7 @@ fn queries<'a>() -> Queries< } if !options.show_bot_accounts { - query = query.filter(person::bot_account.eq(false)); + query = query.filter(not(person::bot_account)); }; query = match options.sort.unwrap_or(CommentSortType::Hot) { @@ -247,23 +250,32 @@ impl PersonMentionView { /// Gets the number of unread mentions pub async fn get_unread_mentions( pool: &mut DbPool<'_>, - my_person_id: PersonId, + local_user: &LocalUser, ) -> Result { use diesel::dsl::count; let conn = &mut get_conn(pool).await?; - person_mention::table + let mut query = person_mention::table .inner_join(comment::table) .left_join( person_block::table.on( comment::creator_id .eq(person_block::target_id) - .and(person_block::person_id.eq(my_person_id)), + .and(person_block::person_id.eq(local_user.person_id)), ), ) - // Dont count replies from blocked users + .inner_join(person::table.on(comment::creator_id.eq(person::id))) + .into_boxed(); + + // These filters need to be kept in sync with the filters in queries().list() + if !local_user.show_bot_accounts { + query = query.filter(not(person::bot_account)); + } + + query + // Don't count replies from blocked users .filter(person_block::person_id.is_null()) - .filter(person_mention::recipient_id.eq(my_person_id)) + .filter(person_mention::recipient_id.eq(local_user.person_id)) .filter(person_mention::read.eq(false)) .filter(comment::deleted.eq(false)) .filter(comment::removed.eq(false)) @@ -300,7 +312,8 @@ mod tests { comment::{Comment, CommentInsertForm}, community::{Community, CommunityInsertForm}, instance::Instance, - person::{Person, PersonInsertForm}, + local_user::{LocalUser, LocalUserInsertForm, LocalUserUpdateForm}, + person::{Person, PersonInsertForm, PersonUpdateForm}, person_block::{PersonBlock, PersonBlockForm}, person_mention::{PersonMention, PersonMentionInsertForm, PersonMentionUpdateForm}, post::{Post, PostInsertForm}, @@ -308,6 +321,7 @@ mod tests { traits::{Blockable, Crud}, utils::build_db_pool_for_tests, }; + use lemmy_db_views::structs::LocalUserView; use lemmy_utils::{error::LemmyResult, LemmyErrorType}; use pretty_assertions::assert_eq; use serial_test::serial; @@ -337,6 +351,9 @@ mod tests { let inserted_recipient = Person::create(pool, &recipient_form).await?; let recipient_id = inserted_recipient.id; + let recipient_local_user = + LocalUser::create(pool, &LocalUserInsertForm::test_form(recipient_id), vec![]).await?; + let new_community = CommunityInsertForm::builder() .name("test community lake".to_string()) .title("nada".to_owned()) @@ -387,7 +404,8 @@ mod tests { PersonMention::update(pool, inserted_mention.id, &person_mention_update_form).await?; // Test to make sure counts and blocks work correctly - let unread_mentions = PersonMentionView::get_unread_mentions(pool, recipient_id).await?; + let unread_mentions = + PersonMentionView::get_unread_mentions(pool, &recipient_local_user).await?; let query = PersonMentionQuery { recipient_id: Some(recipient_id), @@ -410,11 +428,44 @@ mod tests { PersonBlock::block(pool, &block_form).await?; let unread_mentions_after_block = - PersonMentionView::get_unread_mentions(pool, recipient_id).await?; - let mentions_after_block = query.list(pool).await?; + PersonMentionView::get_unread_mentions(pool, &recipient_local_user).await?; + let mentions_after_block = query.clone().list(pool).await?; assert_eq!(0, unread_mentions_after_block); assert_eq!(0, mentions_after_block.len()); + // Unblock user so we can reuse the same person + PersonBlock::unblock(pool, &block_form).await?; + + // Turn Terry into a bot account + let person_update_form = PersonUpdateForm { + bot_account: Some(true), + ..Default::default() + }; + Person::update(pool, inserted_person.id, &person_update_form).await?; + + let recipient_local_user_update_form = LocalUserUpdateForm { + show_bot_accounts: Some(false), + ..Default::default() + }; + LocalUser::update( + pool, + recipient_local_user.id, + &recipient_local_user_update_form, + ) + .await?; + let recipient_local_user_view = LocalUserView::read(pool, recipient_local_user.id) + .await? + .ok_or(LemmyErrorType::CouldntFindLocalUser)?; + + let unread_mentions_after_hide_bots = + PersonMentionView::get_unread_mentions(pool, &recipient_local_user_view.local_user).await?; + + let mut query_without_bots = query.clone(); + query_without_bots.show_bot_accounts = false; + let replies_after_hide_bots = query_without_bots.list(pool).await?; + assert_eq!(0, unread_mentions_after_hide_bots); + assert_eq!(0, replies_after_hide_bots.len()); + Comment::delete(pool, inserted_comment.id).await?; Post::delete(pool, inserted_post.id).await?; Community::delete(pool, inserted_community.id).await?; diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index bc54bb05c..5efd43a30 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,10 +1,7 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{ - newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, -}; +use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use stats::receive_print_stats; use std::{collections::HashMap, time::Duration}; @@ -15,6 +12,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::info; +use util::FederationQueueStateWithDomain; mod inboxes; mod stats; @@ -39,12 +37,12 @@ pub struct SendManager { opts: Opts, workers: HashMap, context: FederationConfig, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, exit_print: JoinHandle<()>, } impl SendManager { - pub fn new(opts: Opts, context: FederationConfig) -> Self { + fn new(opts: Opts, context: FederationConfig) -> Self { assert!(opts.process_count > 0); assert!(opts.process_index > 0); assert!(opts.process_index <= opts.process_count); @@ -62,11 +60,27 @@ impl SendManager { } } - pub fn run(mut self) -> CancellableTask { - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { - self.do_loop(cancel).await?; - self.cancel().await?; - Ok(()) + pub fn run(opts: Opts, context: FederationConfig) -> CancellableTask { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { + let opts = opts.clone(); + let context = context.clone(); + let mut manager = Self::new(opts, context); + async move { + let result = manager.do_loop(cancel).await; + // the loop function will only return if there is (a) an internal error (e.g. db connection + // failure) or (b) it was cancelled from outside. + if let Err(e) = result { + // don't let this error bubble up, just log it, so the below cancel function will run + // regardless + tracing::error!("SendManager failed: {e}"); + } + // cancel all the dependent workers as well to ensure they don't get orphaned and keep + // running. + manager.cancel().await?; + LemmyResult::Ok(()) + // if the task was not intentionally cancelled, then this whole lambda will be run again by + // CancellableTask after this + } }) } @@ -105,14 +119,24 @@ impl SendManager { continue; } // create new worker - let instance = instance.clone(); - let req_data = self.context.to_request_data(); + let context = self.context.clone(); let stats_sender = self.stats_sender.clone(); self.workers.insert( instance.id, - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { - InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?; - Ok(()) + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { + // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask. + let instance = instance.clone(); + let req_data = context.to_request_data(); + let stats_sender = stats_sender.clone(); + async move { + InstanceWorker::init_and_loop( + instance, + req_data, + stop, + stats_sender, + ) + .await + } }), ); } else if !should_federate { @@ -172,7 +196,7 @@ mod test { collections::HashSet, sync::{Arc, Mutex}, }; - use tokio::{spawn, time::sleep}; + use tokio::spawn; struct TestData { send_manager: SendManager, diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs index bb6510263..f927f6ddf 100644 --- a/crates/federate/src/stats.rs +++ b/crates/federate/src/stats.rs @@ -1,15 +1,11 @@ -use crate::util::get_latest_activity_id; +use crate::util::{get_latest_activity_id, FederationQueueStateWithDomain}; use chrono::Local; -use diesel::result::Error::NotFound; use lemmy_api_common::federate_retry_sleep_duration; use lemmy_db_schema::{ newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; -use lemmy_utils::{error::LemmyResult, CACHE_DURATION_FEDERATION}; -use moka::future::Cache; -use once_cell::sync::Lazy; +use lemmy_utils::error::LemmyResult; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; use tracing::{debug, info, warn}; @@ -18,7 +14,7 @@ use tracing::{debug, info, warn}; /// dropped) pub(crate) async fn receive_print_stats( pool: ActualDbPool, - mut receiver: UnboundedReceiver<(InstanceId, FederationQueueState)>, + mut receiver: UnboundedReceiver, ) { let pool = &mut DbPool::Pool(&pool); let mut printerval = interval(Duration::from_secs(60)); @@ -28,7 +24,7 @@ pub(crate) async fn receive_print_stats( ele = receiver.recv() => { match ele { // update stats for instance - Some((instance_id, ele)) => {stats.insert(instance_id, ele);}, + Some(ele) => {stats.insert(ele.state.instance_id, ele);}, // receiver closed, print stats and exit None => { print_stats(pool, &stats).await; @@ -43,7 +39,10 @@ pub(crate) async fn receive_print_stats( } } -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { +async fn print_stats( + pool: &mut DbPool<'_>, + stats: &HashMap, +) { let res = print_stats_with_error(pool, stats).await; if let Err(e) = res { warn!("Failed to print stats: {e}"); @@ -52,18 +51,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap, - stats: &HashMap, + stats: &HashMap, ) -> LemmyResult<()> { - static INSTANCE_CACHE: Lazy>> = Lazy::new(|| { - Cache::builder() - .max_capacity(1) - .time_to_live(CACHE_DURATION_FEDERATION) - .build() - }); - let instances = INSTANCE_CACHE - .try_get_with((), async { Instance::read_all(pool).await }) - .await?; - let last_id = get_latest_activity_id(pool).await?; // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be @@ -72,12 +61,9 @@ async fn print_stats_with_error( // todo: more stats (act/sec, avg http req duration) let mut ok_count = 0; let mut behind_count = 0; - for (instance_id, stat) in stats { - let domain = &instances - .iter() - .find(|i| &i.id == instance_id) - .ok_or(NotFound)? - .domain; + for ele in stats.values() { + let stat = &ele.state; + let domain = &ele.domain; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); if stat.fail_count > 0 { info!( diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 4476c6f5e..e9a7ab4a4 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -11,13 +11,13 @@ use lemmy_db_schema::{ source::{ activity::{ActorType, SentActivity}, community::Community, + federation_queue_state::FederationQueueState, person::Person, site::Site, }, traits::ApubActor, utils::{get_conn, DbPool}, }; -use lemmy_utils::error::LemmyResult; use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; @@ -25,7 +25,6 @@ use serde_json::Value; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; -use tracing::error; /// Recheck for new federation work every n seconds. /// @@ -47,26 +46,33 @@ pub struct CancellableTask { impl CancellableTask { /// spawn a task but with graceful shutdown - pub fn spawn( + pub fn spawn( timeout: Duration, - task: impl FnOnce(CancellationToken) -> F + Send + 'static, + task: impl Fn(CancellationToken) -> F + Send + 'static, ) -> CancellableTask where - F: Future> + Send + 'static, - R: Send + 'static, + F: Future + Send + 'static, + R: Send + Debug + 'static, { let stop = CancellationToken::new(); let stop2 = stop.clone(); - let task: JoinHandle> = tokio::spawn(task(stop2)); + let task: JoinHandle<()> = tokio::spawn(async move { + loop { + let res = task(stop2.clone()).await; + if stop2.is_cancelled() { + return; + } else { + tracing::warn!("task exited, restarting: {res:?}"); + } + } + }); let abort = task.abort_handle(); CancellableTask { f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { - if let Err(ref e) = r? { - error!("CancellableTask threw error: {e}"); - } + r.context("CancellableTask failed to cancel cleanly, returned error")?; Ok(()) }, _ = sleep(timeout) => { @@ -171,3 +177,10 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, state: FederationQueueState, last_state_insert: DateTime, } @@ -60,7 +61,7 @@ impl InstanceWorker { instance: Instance, context: Data, stop: CancellationToken, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, ) -> LemmyResult<()> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; @@ -239,9 +240,10 @@ impl InstanceWorker { async fn save_and_send_state(&mut self) -> Result<()> { self.last_state_insert = Utc::now(); FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; - self - .stats_sender - .send((self.target.id, self.state.clone()))?; + self.stats_sender.send(FederationQueueStateWithDomain { + state: self.state.clone(), + domain: self.target.domain.clone(), + })?; Ok(()) } } diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c690a5f48..493b9c205 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -53,7 +53,7 @@ services: lemmy-ui: # use "image" to pull down an already compiled lemmy-ui. make sure to comment out "build". - image: dessalines/lemmy-ui:0.19.3 + image: dessalines/lemmy-ui:0.19.4-rc.3 # platform: linux/x86_64 # no arm64 support. uncomment platform if using m1. # use "build" to build your local lemmy ui image for development. make sure to comment out "image". # run: docker compose up --build diff --git a/src/lib.rs b/src/lib.rs index c2b5e57c2..26740a444 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,14 +210,13 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { None }; let federate = (!args.disable_activity_sending).then(|| { - let task = SendManager::new( + SendManager::run( Opts { process_index: args.federate_process_index, process_count: args.federate_process_count, }, federation_config, - ); - task.run() + ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;