diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index ba6dd324..4044ba0d 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -6,6 +6,8 @@ set -e export RUST_BACKTRACE=1 export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min + for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE" @@ -34,30 +36,30 @@ echo "$PWD" echo "start alpha" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_alpha.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \ -target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \ + target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 & echo "start beta" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_beta.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \ -target/lemmy_server >/tmp/lemmy_beta.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \ + target/lemmy_server >/tmp/lemmy_beta.out 2>&1 & echo "start gamma" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_gamma.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \ -target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \ + target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 & echo "start delta" # An instance with only an allowlist for beta LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_delta.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \ -target/lemmy_server >/tmp/lemmy_delta.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \ + target/lemmy_server >/tmp/lemmy_delta.out 2>&1 & echo "start epsilon" # An instance who has a blocklist, with lemmy-alpha blocked LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_epsilon.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \ -target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \ + target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 & echo "wait for all instances to start" while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'lemmy-alpha:8541/api/v3/site')" != "200" ]]; do sleep 1; done diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 394f73dc..6ced2bf3 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -33,21 +33,21 @@ import { getUnreadCount, waitUntil, delay, + waitForPost, alphaUrl, } from "./shared"; import { CommentView } from "lemmy-js-client/dist/types/CommentView"; +import { CommunityView } from "lemmy-js-client"; import { LemmyHttp } from "lemmy-js-client"; +let betaCommunity: CommunityView | undefined; let postOnAlphaRes: PostResponse; beforeAll(async () => { await setupLogins(); await unfollows(); - await followBeta(alpha); - await followBeta(gamma); - // wait for FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); - let betaCommunity = (await resolveBetaCommunity(alpha)).community; + await Promise.all([followBeta(alpha), followBeta(gamma)]); + betaCommunity = (await resolveBetaCommunity(alpha)).community; if (betaCommunity) { postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); } @@ -343,6 +343,8 @@ test("Federated comment like", async () => { }); test("Reply to a comment from another instance, get notification", async () => { + await alpha.markAllAsRead(); + let betaCommunity = (await resolveBetaCommunity(alpha)).community; if (!betaCommunity) { throw "Missing beta community"; @@ -375,16 +377,17 @@ test("Reply to a comment from another instance, get notification", async () => { expect(replyRes.comment_view.counts.score).toBe(1); // Make sure that reply comment is seen on alpha - // TODO not sure why, but a searchComment back to alpha, for the ap_id of betas - // comment, isn't working. - // let searchAlpha = await searchComment(alpha, replyRes.comment); + let commentSearch = await waitUntil( + () => resolveComment(alpha, replyRes.comment_view.comment), + c => c.comment?.counts.score === 1, + ); + let alphaComment = commentSearch.comment!; let postComments = await waitUntil( () => getComments(alpha, postOnAlphaRes.post_view.post.id), pc => pc.comments.length >= 2, ); // Note: this test fails when run twice and this count will differ expect(postComments.comments.length).toBeGreaterThanOrEqual(2); - let alphaComment = postComments.comments[0]; expect(alphaComment.comment.content).toBeDefined(); expect(getCommentParentId(alphaComment.comment)).toBe( @@ -400,23 +403,29 @@ test("Reply to a comment from another instance, get notification", async () => { () => getUnreadCount(alpha), e => e.replies >= 1, ); - expect(alphaUnreadCountRes.replies).toBe(1); + expect(alphaUnreadCountRes.replies).toBeGreaterThanOrEqual(1); // check inbox of replies on alpha, fetching read/unread both let alphaRepliesRes = await getReplies(alpha); - expect(alphaRepliesRes.replies.length).toBe(1); - expect(alphaRepliesRes.replies[0].comment.content).toBeDefined(); - expect(alphaRepliesRes.replies[0].community.local).toBe(false); - expect(alphaRepliesRes.replies[0].creator.local).toBe(false); - expect(alphaRepliesRes.replies[0].counts.score).toBe(1); + const alphaReply = alphaRepliesRes.replies.find( + r => r.comment.id === alphaComment.comment.id, + ); + expect(alphaReply).toBeDefined(); + if (!alphaReply) throw Error(); + expect(alphaReply.comment.content).toBeDefined(); + expect(alphaReply.community.local).toBe(false); + expect(alphaReply.creator.local).toBe(false); + expect(alphaReply.counts.score).toBe(1); // ToDo: interesting alphaRepliesRes.replies[0].comment_reply.id is 1, meaning? how did that come about? - expect(alphaRepliesRes.replies[0].comment.id).toBe(alphaComment.comment.id); + expect(alphaReply.comment.id).toBe(alphaComment.comment.id); // this is a new notification, getReplies fetch was for read/unread both, confirm it is unread. - expect(alphaRepliesRes.replies[0].comment_reply.read).toBe(false); - assertCommentFederation(alphaRepliesRes.replies[0], replyRes.comment_view); + expect(alphaReply.comment_reply.read).toBe(false); + assertCommentFederation(alphaReply, replyRes.comment_view); }); test("Mention beta from alpha", async () => { + if (!betaCommunity) throw Error("no community"); + const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); // Create a new branch, trunk-level comment branch, from alpha instance let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id); // Create a reply comment to previous comment, this has a mention in body @@ -433,7 +442,7 @@ test("Mention beta from alpha", async () => { expect(mentionRes.comment_view.counts.score).toBe(1); // get beta's localized copy of the alpha post - let betaPost = (await resolvePost(beta, postOnAlphaRes.post_view.post)).post; + let betaPost = await waitForPost(beta, postOnAlphaRes.post_view.post); if (!betaPost) { throw "unable to locate post on beta"; } @@ -443,9 +452,9 @@ test("Mention beta from alpha", async () => { // Make sure that both new comments are seen on beta and have parent/child relationship let betaPostComments = await waitUntil( () => getComments(beta, betaPost!.post.id), - c => c.comments[1].counts.score === 1, + c => c.comments[1]?.counts.score === 1, ); - expect(betaPostComments.comments.length).toBeGreaterThanOrEqual(2); + expect(betaPostComments.comments.length).toEqual(2); // the trunk-branch root comment will be older than the mention reply comment, so index 1 let betaRootComment = betaPostComments.comments[1]; // the trunk-branch root comment should not have a parent @@ -460,7 +469,10 @@ test("Mention beta from alpha", async () => { expect(betaRootComment.counts.score).toBe(1); assertCommentFederation(betaRootComment, commentRes.comment_view); - let mentionsRes = await getMentions(beta); + let mentionsRes = await waitUntil( + () => getMentions(beta), + m => !!m.mentions[0], + ); expect(mentionsRes.mentions[0].comment.content).toBeDefined(); expect(mentionsRes.mentions[0].community.local).toBe(true); expect(mentionsRes.mentions[0].creator.local).toBe(false); @@ -492,7 +504,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t expect(alphaPost.post_view.community.local).toBe(true); // Make sure gamma sees it - let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post)).post; + let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post))!.post; if (!gammaPost) { throw "Missing gamma post"; @@ -514,7 +526,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t // Make sure alpha sees it let alphaPostComments2 = await waitUntil( () => getComments(alpha, alphaPost.post_view.post.id), - e => !!e.comments[0], + e => e.comments[0]?.counts.score === 1, ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(true); @@ -560,21 +572,19 @@ test("Check that activity from another instance is sent to third instance", asyn () => resolveBetaCommunity(gamma), c => c.community?.subscribed === "Subscribed", ); - // FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); // Create a post on beta let betaPost = await createPost(beta, 2); expect(betaPost.post_view.community.local).toBe(true); // Make sure gamma and alpha see it - let gammaPost = (await resolvePost(gamma, betaPost.post_view.post)).post; + let gammaPost = await waitForPost(gamma, betaPost.post_view.post); if (!gammaPost) { throw "Missing gamma post"; } expect(gammaPost.post).toBeDefined(); - let alphaPost = (await resolvePost(alpha, betaPost.post_view.post)).post; + let alphaPost = await waitForPost(alpha, betaPost.post_view.post); if (!alphaPost) { throw "Missing alpha post"; } @@ -596,7 +606,7 @@ test("Check that activity from another instance is sent to third instance", asyn // Make sure alpha sees it let alphaPostComments2 = await waitUntil( () => getComments(alpha, alphaPost!.post.id), - e => !!e.comments[0], + e => e.comments[0]?.counts.score === 1, ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(false); @@ -607,8 +617,7 @@ test("Check that activity from another instance is sent to third instance", asyn commentRes.comment_view, ); - await unfollowRemotes(alpha); - await unfollowRemotes(gamma); + await Promise.all([unfollowRemotes(alpha), unfollowRemotes(gamma)]); }); test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedded comments, A subs to B, B updates the lowest level comment, A fetches both the post and all the inreplyto comments for that post.", async () => { @@ -660,8 +669,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde expect(updateRes.comment_view.comment.content).toBe(updatedCommentContent); // Get the post from alpha - let alphaPostB = (await resolvePost(alpha, postOnBetaRes.post_view.post)) - .post; + let alphaPostB = await waitForPost(alpha, postOnBetaRes.post_view.post); + if (!alphaPostB) { throw "Missing alpha post B"; } @@ -671,7 +680,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde () => getComments(alpha, alphaPostB!.post.id), c => c.comments[1]?.comment.content === - parentCommentRes.comment_view.comment.content, + parentCommentRes.comment_view.comment.content && + c.comments[0]?.comment.content === updateRes.comment_view.comment.content, ); expect(alphaPost.post_view.post.name).toBeDefined(); assertCommentFederation( @@ -705,16 +715,17 @@ test("Report a comment", async () => { throw "Missing alpha comment"; } - let alphaReport = ( - await reportComment(alpha, alphaComment.id, randomString(10)) - ).comment_report_view.comment_report; + const reason = randomString(10); + let alphaReport = (await reportComment(alpha, alphaComment.id, reason)) + .comment_report_view.comment_report; - let betaReport = ( - await waitUntil( - () => listCommentReports(beta), - e => !!e.comment_reports[0], - ) - ).comment_reports[0].comment_report; + let betaReport = (await waitUntil( + () => + listCommentReports(beta).then(r => + r.comment_reports.find(rep => rep.comment_report.reason === reason), + ), + e => !!e, + ))!.comment_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_comment_text).toBe( diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index d91ac8aa..b81dd900 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -26,6 +26,7 @@ import { blockInstance, waitUntil, delay, + waitForPost, alphaUrl, } from "./shared"; import { LemmyHttp } from "lemmy-js-client"; @@ -89,12 +90,6 @@ test("Delete community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); - await waitUntil( - () => resolveCommunity(alpha, searchShort), - g => g.community?.subscribed === "Subscribed", - ); - // wait FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); let deleteCommunityRes = await deleteCommunity( beta, true, @@ -147,10 +142,6 @@ test("Remove community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); - await waitUntil( - () => resolveCommunity(alpha, searchShort), - g => g.community?.subscribed === "Subscribed", - ); let removeCommunityRes = await removeCommunity( beta, true, @@ -361,8 +352,8 @@ test("User blocks instance, communities are hidden", async () => { expect(postRes.post_view.post.id).toBeDefined(); // fetch post to alpha - let alphaPost = await resolvePost(alpha, postRes.post_view.post); - expect(alphaPost.post?.post).toBeDefined(); + let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post!; + expect(alphaPost.post).toBeDefined(); // post should be included in listing let listing = await getPosts(alpha, "All"); @@ -370,7 +361,7 @@ test("User blocks instance, communities are hidden", async () => { expect(listing_ids).toContain(postRes.post_view.post.ap_id); // block the beta instance - await blockInstance(alpha, alphaPost.post!.community.instance_id, true); + await blockInstance(alpha, alphaPost.community.instance_id, true); // after blocking, post should not be in listing let listing2 = await getPosts(alpha, "All"); @@ -378,7 +369,7 @@ test("User blocks instance, communities are hidden", async () => { expect(listing_ids2.indexOf(postRes.post_view.post.ap_id)).toBe(-1); // unblock instance again - await blockInstance(alpha, alphaPost.post!.community.instance_id, false); + await blockInstance(alpha, alphaPost.community.instance_id, false); // post should be included in listing let listing3 = await getPosts(alpha, "All"); diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 2ef28951..51a10293 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -34,7 +34,7 @@ import { unfollows, resolveCommunity, waitUntil, - delay, + waitForPost, alphaUrl, } from "./shared"; import { PostView } from "lemmy-js-client/dist/types/PostView"; @@ -83,11 +83,11 @@ test("Create a post", async () => { expect(postRes.post_view.counts.score).toBe(1); // Make sure that post is liked on beta - const res = await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => res.post?.counts.score === 1, + const betaPost = await waitForPost( + beta, + postRes.post_view.post, + res => res?.counts.score === 1, ); - let betaPost = res.post; expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); @@ -123,12 +123,12 @@ test("Unlike a post", async () => { expect(unlike2.post_view.counts.score).toBe(0); // Make sure that post is unliked on beta - const betaPost = ( - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - b => b.post?.counts.score === 0, - ) - ).post; + const betaPost = await waitForPost( + beta, + postRes.post_view.post, + post => post?.counts.score === 0, + ); + expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); expect(betaPost?.creator.local).toBe(false); @@ -141,26 +141,16 @@ test("Update a post", async () => { throw "Missing beta community"; } let postRes = await createPost(alpha, betaCommunity.community.id); - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => !!res.post, - ); + await waitForPost(beta, postRes.post_view.post); let updatedName = "A jest test federated post, updated"; let updatedPost = await editPost(alpha, postRes.post_view.post); - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => res.post?.post.name === updatedName, - ); expect(updatedPost.post_view.post.name).toBe(updatedName); expect(updatedPost.post_view.community.local).toBe(false); expect(updatedPost.post_view.creator.local).toBe(true); // Make sure that post is updated on beta - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; - if (!betaPost) { - throw "Missing beta post"; - } + let betaPost = await waitForPost(beta, updatedPost.post_view.post); expect(betaPost.community.local).toBe(true); expect(betaPost.creator.local).toBe(false); expect(betaPost.post.name).toBe(updatedName); @@ -178,7 +168,7 @@ test("Sticky a post", async () => { } let postRes = await createPost(alpha, betaCommunity.community.id); - let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost1 = await waitForPost(beta, postRes.post_view.post); if (!betaPost1) { throw "Missing beta post1"; } @@ -221,30 +211,19 @@ test("Lock a post", async () => { () => resolveBetaCommunity(alpha), c => c.community?.subscribed === "Subscribed", ); - // wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently) - await delay(2_000); let postRes = await createPost(alpha, betaCommunity.community.id); - // wait for federation - await waitUntil( - () => searchPostLocal(beta, postRes.post_view.post), - res => !!res.posts[0], - ); + let betaPost1 = await waitForPost(beta, postRes.post_view.post); // Lock the post - let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; - if (!betaPost1) { - throw "Missing beta post1"; - } let lockedPostRes = await lockPost(beta, true, betaPost1.post); expect(lockedPostRes.post_view.post.locked).toBe(true); // Make sure that post is locked on alpha - let searchAlpha = await waitUntil( - () => searchPostLocal(alpha, postRes.post_view.post), - res => res.posts[0]?.post.locked, + let alphaPost1 = await waitForPost( + alpha, + postRes.post_view.post, + post => !!post && post.post.locked, ); - let alphaPost1 = searchAlpha.posts[0]; - expect(alphaPost1.post.locked).toBe(true); // Try to make a new comment there, on alpha await expect(createComment(alpha, alphaPost1.post.id)).rejects.toBe("locked"); @@ -254,11 +233,11 @@ test("Lock a post", async () => { expect(unlockedPost.post_view.post.locked).toBe(false); // Make sure that post is unlocked on alpha - let searchAlpha2 = await waitUntil( - () => searchPostLocal(alpha, postRes.post_view.post), - res => !res.posts[0]?.post.locked, + let alphaPost2 = await waitForPost( + alpha, + postRes.post_view.post, + post => !!post && !post.post.locked, ); - let alphaPost2 = searchAlpha2.posts[0]; expect(alphaPost2.community.local).toBe(false); expect(alphaPost2.creator.local).toBe(true); expect(alphaPost2.post.locked).toBe(false); @@ -275,6 +254,7 @@ test("Delete a post", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); + await waitForPost(beta, postRes.post_view.post); let deletedPost = await deletePost(alpha, true, postRes.post_view.post); expect(deletedPost.post_view.post.deleted).toBe(true); @@ -282,16 +262,18 @@ test("Delete a post", async () => { // Make sure lemmy beta sees post is deleted // This will be undefined because of the tombstone - await expect(resolvePost(beta, postRes.post_view.post)).rejects.toBe( - "couldnt_find_object", - ); + await waitForPost(beta, postRes.post_view.post, p => !p || p.post.deleted); // Undelete let undeletedPost = await deletePost(alpha, false, postRes.post_view.post); - expect(undeletedPost.post_view.post.deleted).toBe(false); // Make sure lemmy beta sees post is undeleted - let betaPost2 = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost2 = await waitForPost( + beta, + postRes.post_view.post, + p => !!p && !p.post.deleted, + ); + if (!betaPost2) { throw "Missing beta post 2"; } @@ -350,11 +332,7 @@ test("Remove a post from admin and community on same instance", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); // Get the id for beta - let searchBeta = await waitUntil( - () => searchPostLocal(beta, postRes.post_view.post), - res => !!res.posts[0], - ); - let betaPost = searchBeta.posts[0]; + let betaPost = await waitForPost(beta, postRes.post_view.post); expect(betaPost).toBeDefined(); // The beta admin removes it (the community lives on beta) @@ -362,18 +340,25 @@ test("Remove a post from admin and community on same instance", async () => { expect(removePostRes.post_view.post.removed).toBe(true); // Make sure lemmy alpha sees post is removed - // let alphaPost = await getPost(alpha, postRes.post_view.post.id); - // expect(alphaPost.post_view.post.removed).toBe(true); // TODO this shouldn't be commented - // assertPostFederation(alphaPost.post_view, removePostRes.post_view); + let alphaPost = await waitUntil( + () => getPost(alpha, postRes.post_view.post.id), + p => p?.post_view.post.removed ?? false, + ); + expect(alphaPost.post_view?.post.removed).toBe(true); + assertPostFederation(alphaPost.post_view, removePostRes.post_view); // Undelete let undeletedPost = await removePost(beta, false, betaPost.post); expect(undeletedPost.post_view.post.removed).toBe(false); // Make sure lemmy alpha sees post is undeleted - let alphaPost2 = await getPost(alpha, postRes.post_view.post.id); - expect(alphaPost2.post_view.post.removed).toBe(false); - assertPostFederation(alphaPost2.post_view, undeletedPost.post_view); + let alphaPost2 = await waitForPost( + alpha, + postRes.post_view.post, + p => !!p && !p.post.removed, + ); + expect(alphaPost2.post.removed).toBe(false); + assertPostFederation(alphaPost2, undeletedPost.post_view); await unfollowRemotes(alpha); }); @@ -385,7 +370,7 @@ test("Search for a post", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost = await waitForPost(beta, postRes.post_view.post); expect(betaPost?.post.name).toBeDefined(); }); @@ -413,11 +398,7 @@ test("Enforce site ban for federated user", async () => { // alpha makes post in beta community, it federates to beta instance let postRes1 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta1 = await waitUntil( - () => searchPostLocal(beta, postRes1.post_view.post), - res => !!res.posts[0], - ); - expect(searchBeta1.posts[0]).toBeDefined(); + let searchBeta1 = await waitForPost(beta, postRes1.post_view.post); // ban alpha from its instance let banAlpha = await banPersonFromSite( @@ -436,8 +417,10 @@ test("Enforce site ban for federated user", async () => { expect(alphaUserOnBeta1.person?.person.banned).toBe(true); // existing alpha post should be removed on beta - let searchBeta2 = await getPost(beta, searchBeta1.posts[0].post.id); - expect(searchBeta2.post_view.post.removed).toBe(true); + let searchBeta2 = await waitUntil( + () => getPost(beta, searchBeta1.post.id), + s => s.post_view.post.removed, + ); // Unban alpha let unBanAlpha = await banPersonFromSite( @@ -450,11 +433,7 @@ test("Enforce site ban for federated user", async () => { // alpha makes new post in beta community, it federates let postRes2 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta3 = await waitUntil( - () => searchPostLocal(beta, postRes2.post_view.post), - e => !!e.posts[0], - ); - expect(searchBeta3.posts[0]).toBeDefined(); + let searchBeta3 = await waitForPost(beta, postRes2.post_view.post); let alphaUserOnBeta2 = await resolvePerson(beta, alphaUserActorId!); expect(alphaUserOnBeta2.person?.person.banned).toBe(false); @@ -544,12 +523,16 @@ test("Report a post", async () => { await reportPost(alpha, alphaPost.post.id, randomString(10)) ).post_report_view.post_report; - let betaReport = ( - await waitUntil( - () => listPostReports(beta), - res => !!res.post_reports[0], - ) - ).post_reports[0].post_report; + let betaReport = (await waitUntil( + () => + listPostReports(beta).then(p => + p.post_reports.find( + r => + r.post_report.original_post_name === alphaReport.original_post_name, + ), + ), + res => !!res, + ))!.post_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_post_name).toBe(alphaReport.original_post_name); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 19636f38..a1868f8f 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -6,6 +6,7 @@ import { GetUnreadCountResponse, InstanceId, LemmyHttp, + PostView, } from "lemmy-js-client"; import { CreatePost } from "lemmy-js-client/dist/types/CreatePost"; import { DeletePost } from "lemmy-js-client/dist/types/DeletePost"; @@ -181,7 +182,7 @@ export async function setupLogins() { // otherwise the first few federated events may be missed // (because last_successful_id is set to current id when federation to an instance is first started) // only needed the first time so do in this try - await delay(6_000); + await delay(10_000); } catch (_) { console.log("Communities already exist"); } @@ -288,6 +289,18 @@ export async function searchPostLocal( return api.search(form); } +/// wait for a post to appear locally without pulling it +export async function waitForPost( + api: LemmyHttp, + post: Post, + checker: (t: PostView | undefined) => boolean = p => !!p, +) { + return waitUntil( + () => searchPostLocal(api, post).then(p => p.posts[0]), + checker, + ); +} + export async function getPost( api: LemmyHttp, post_id: number, @@ -405,7 +418,14 @@ export async function followCommunity( community_id, follow, }; - return api.followCommunity(form); + const res = await api.followCommunity(form); + await waitUntil( + () => resolveCommunity(api, res.community_view.community.actor_id), + g => g.community?.subscribed === (follow ? "Subscribed" : "NotSubscribed"), + ); + // wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently) + await delay(2000); + return res; } export async function likePost( @@ -686,9 +706,9 @@ export async function unfollowRemotes( let site = await getSite(api); let remoteFollowed = site.my_user?.follows.filter(c => c.community.local == false) ?? []; - for (let cu of remoteFollowed) { - await followCommunity(api, false, cu.community.id); - } + await Promise.all( + remoteFollowed.map(cu => followCommunity(api, false, cu.community.id)), + ); let siteRes = await getSite(api); return siteRes; } @@ -787,10 +807,12 @@ export function randomString(length: number): string { } export async function unfollows() { - await unfollowRemotes(alpha); - await unfollowRemotes(gamma); - await unfollowRemotes(delta); - await unfollowRemotes(epsilon); + await Promise.all([ + unfollowRemotes(alpha), + unfollowRemotes(gamma), + unfollowRemotes(delta), + unfollowRemotes(epsilon), + ]); } export function getCommentParentId(comment: Comment): number | undefined { @@ -809,14 +831,18 @@ export async function waitUntil( fetcher: () => Promise, checker: (t: T) => boolean, retries = 10, - delaySeconds = 2, + delaySeconds = [0.2, 0.5, 1, 2, 3], ) { let retry = 0; + let result; while (retry++ < retries) { - const result = await fetcher(); + result = await fetcher(); if (checker(result)) return result; - await delay(delaySeconds * 1000); + await delay( + delaySeconds[Math.min(retry - 1, delaySeconds.length - 1)] * 1000, + ); } + console.error("result", result); throw Error( `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, ); diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 84b2efb2..6d9c722a 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -18,7 +18,15 @@ use lemmy_db_schema::{ }; use lemmy_db_views::structs::PrivateMessageView; use lemmy_utils::error::LemmyResult; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; +use tokio::{ + sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, + Mutex, + }, + task::JoinHandle, +}; use url::Url; type MatchOutgoingActivitiesBoxed = @@ -54,16 +62,45 @@ pub enum SendActivityData { CreateReport(Url, Person, Community, String), } -pub struct ActivityChannel; +// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with +// ctrl+c still works. +static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { + let (sender, receiver) = mpsc::unbounded_channel(); + let weak_sender = sender.downgrade(); + ActivityChannel { + weak_sender, + receiver: Mutex::new(receiver), + keepalive_sender: Mutex::new(Some(sender)), + } +}); + +pub struct ActivityChannel { + weak_sender: WeakUnboundedSender, + receiver: Mutex>, + keepalive_sender: Mutex>>, +} impl ActivityChannel { + pub async fn retrieve_activity() -> Option { + let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; + lock.recv().await + } + pub async fn submit_activity( data: SendActivityData, - context: &Data, + _context: &Data, ) -> LemmyResult<()> { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await + // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, + // not sure which way is more efficient + if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + sender.send(data)?; + } + Ok(()) + } + + pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { + ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); + outgoing_activities_task.await??; + Ok(()) } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 5c061cb3..4fb97c1c 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -175,7 +175,7 @@ pub async fn create_post( mark_post_as_read(person_id, post_id, &mut context.pool()).await?; if let Some(url) = updated_post.url.clone() { - let task = async move { + spawn_try_task(async move { let mut webmention = Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; webmention.set_checked(true); @@ -188,8 +188,7 @@ pub async fn create_post( Ok(_) => Ok(()), Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } - }; - spawn_try_task(task); + }); }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index 5939c023..e84a970f 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -1,6 +1,7 @@ use crate::{ activities::{ generate_activity_id, + generate_announce_activity_id, send_lemmy_activity, verify_is_public, verify_person_in_community, @@ -75,16 +76,20 @@ impl AnnounceActivity { community: &ApubCommunity, context: &Data, ) -> Result { + let inner_kind = object + .other + .get("type") + .and_then(serde_json::Value::as_str) + .unwrap_or("other"); + let id = + generate_announce_activity_id(inner_kind, &context.settings().get_protocol_and_hostname())?; Ok(AnnounceActivity { actor: community.id().into(), to: vec![public()], object: IdOrNestedObject::NestedObject(object), cc: vec![community.followers_url.clone().into()], kind: AnnounceType::Announce, - id: generate_activity_id( - &AnnounceType::Announce, - &context.settings().get_protocol_and_hostname(), - )?, + id, }) } diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 29ec5bd3..958065ff 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -28,12 +28,15 @@ use crate::{ use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, - kinds::public, + kinds::{activity::AnnounceType, public}, protocol::context::WithContext, traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::{context::LemmyContext, send_activity::SendActivityData}; +use lemmy_api_common::{ + context::LemmyContext, + send_activity::{ActivityChannel, SendActivityData}, +}; use lemmy_db_schema::{ newtypes::CommunityId, source::{ @@ -42,10 +45,7 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; -use lemmy_utils::{ - error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, - spawn_try_task, -}; +use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}; use serde::Serialize; use std::{ops::Deref, time::Duration}; use tracing::info; @@ -181,6 +181,21 @@ where Url::parse(&id) } +/// like generate_activity_id but also add the inner kind for easier debugging +fn generate_announce_activity_id( + inner_kind: &str, + protocol_and_hostname: &str, +) -> Result { + let id = format!( + "{}/activities/{}/{}/{}", + protocol_and_hostname, + AnnounceType::Announce.to_string().to_lowercase(), + inner_kind.to_lowercase(), + Uuid::new_v4() + ); + Url::parse(&id) +} + pub(crate) trait GetActorType { fn actor_type(&self) -> ActorType; } @@ -198,12 +213,12 @@ where ActorT: Actor + GetActorType, Activity: ActivityHandler, { - info!("Sending activity {}", activity.id().to_string()); + info!("Saving outgoing activity to queue {}", activity.id()); let activity = WithContext::new(activity, CONTEXT.deref().clone()); let form = SentActivityForm { ap_id: activity.id().clone().into(), - data: serde_json::to_value(activity.clone())?, + data: serde_json::to_value(activity)?, sensitive, send_inboxes: send_targets .inboxes @@ -220,6 +235,13 @@ where Ok(()) } +pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { + while let Some(data) = ActivityChannel::retrieve_activity().await { + match_outgoing_activities(data, &context.reset_request_count()).await? + } + Ok(()) +} + pub async fn match_outgoing_activities( data: SendActivityData, context: &Data, @@ -324,6 +346,6 @@ pub async fn match_outgoing_activities( } } }; - spawn_try_task(fed_task); + fed_task.await?; Ok(()) } diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 4f260708..f744d45f 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -1,8 +1,5 @@ use anyhow::{anyhow, Context, Result}; -use diesel::{ - prelude::*, - sql_types::{Bool, Int8}, -}; +use diesel::prelude::*; use diesel_async::RunQueryDsl; use lemmy_apub::{ activity_lists::SharedInboxActivities, @@ -31,6 +28,26 @@ use std::{ use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; +/// Decrease the delays of the federation queue. +/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. +pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_TEST_FAST_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(false) +}); +/// Recheck for new federation work every n seconds. +/// +/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, +/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. +/// This delay is only applied if no federated activity happens during sending activities of the last batch. +pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + Duration::from_millis(100) + } else { + Duration::from_secs(30) + } +}); + pub struct CancellableTask { f: Pin> + Send + 'static>>, ended: Arc>, @@ -162,22 +179,20 @@ pub(crate) async fn get_activity_cached( pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { static CACHE: Lazy> = Lazy::new(|| { Cache::builder() - .time_to_live(Duration::from_secs(1)) + .time_to_live(if *LEMMY_TEST_FAST_FEDERATION { + *WORK_FINISHED_RECHECK_DELAY + } else { + Duration::from_secs(1) + }) .build() }); CACHE .try_get_with((), async { + use diesel::dsl::max; + use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity}; let conn = &mut get_conn(pool).await?; - let seq: Sequence = - diesel::sql_query("select last_value, is_called from sent_activity_id_seq") - .get_result(conn) - .await?; - let latest_id = if seq.is_called { - seq.last_value as ActivityId - } else { - // if a PG sequence has never been used, last_value will actually be next_value - (seq.last_value - 1) as ActivityId - }; + let seq: Option = sent_activity.select(max(id)).get_result(conn).await?; + let latest_id = seq.unwrap_or(0); anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) }) .await @@ -188,11 +203,3 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result Duration { Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) } - -#[derive(QueryableByName)] -struct Sequence { - #[diesel(sql_type = Int8)] - last_value: i64, // this value is bigint for some reason even if sequence is int4 - #[diesel(sql_type = Bool)] - is_called: bool, -} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index a2bdf33c..3eda2e74 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,6 +1,13 @@ use crate::{ federation_queue_state::FederationQueueState, - util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + retry_sleep_duration, + LEMMY_TEST_FAST_FEDERATION, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use anyhow::{Context, Result}; @@ -22,20 +29,27 @@ use std::{ }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) + +/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) +/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; +/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// recheck for new federation work every n seconds -#[cfg(debug_assertions)] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); -#[cfg(not(debug_assertions))] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); -#[cfg(debug_assertions)] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::seconds(1)); -#[cfg(not(debug_assertions))] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(1)); +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::Duration::seconds(1) + } else { + chrono::Duration::minutes(2) + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::Duration::hours(1)); pub(crate) struct InstanceWorker { @@ -121,6 +135,7 @@ impl InstanceWorker { } Ok(()) } + /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; if self.state.last_successful_id == -1 { @@ -134,7 +149,7 @@ impl InstanceWorker { if id == latest_id { // no more work to be done, wait before rechecking tokio::select! { - () = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, () = self.stop.cancelled() => {} } return Ok(()); @@ -254,7 +269,8 @@ impl InstanceWorker { .send_inboxes .iter() .filter_map(std::option::Option::as_ref) - .filter_map(|u| (u.domain() == Some(&self.instance.domain)).then(|| u.inner().clone())), + .filter(|&u| (u.domain() == Some(&self.instance.domain))) + .map(|u| u.inner().clone()), ); Ok(inbox_urls) } @@ -263,7 +279,7 @@ impl InstanceWorker { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { // process removals every hour (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, self.last_full_communities_fetch) + .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) .await?; self.last_incremental_communities_fetch = self.last_full_communities_fetch; } @@ -289,13 +305,13 @@ impl InstanceWorker { instance_id: InstanceId, last_fetch: DateTime, ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = Utc::now(); // update to time before fetch to ensure overlap + let new_last_fetch = Utc::now() - chrono::Duration::seconds(10); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Ok(( CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) .await? .into_iter() .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_insert_with(HashSet::new).insert(u.into()); + map.entry(c).or_default().insert(u.into()); map }), new_last_fetch, diff --git a/migrations/2023-09-12-194850_add_federation_worker_index/down.sql b/migrations/2023-09-12-194850_add_federation_worker_index/down.sql new file mode 100644 index 00000000..a203e808 --- /dev/null +++ b/migrations/2023-09-12-194850_add_federation_worker_index/down.sql @@ -0,0 +1,2 @@ +DROP INDEX idx_person_local_instance; + diff --git a/migrations/2023-09-12-194850_add_federation_worker_index/up.sql b/migrations/2023-09-12-194850_add_federation_worker_index/up.sql new file mode 100644 index 00000000..bbbab0b1 --- /dev/null +++ b/migrations/2023-09-12-194850_add_federation_worker_index/up.sql @@ -0,0 +1,2 @@ +CREATE INDEX idx_person_local_instance ON person (local DESC, instance_id); + diff --git a/src/lib.rs b/src/lib.rs index e1c6d1fa..9ce1bfa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::MATCH_OUTGOING_ACTIVITIES, + send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; use lemmy_apub::{ - activities::match_outgoing_activities, + activities::{handle_outgoing_activities, match_outgoing_activities}, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, }; @@ -203,6 +203,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { Box::pin(match_outgoing_activities(d, c)) })) .expect("set function pointer"); + let request_data = federation_config.to_request_data(); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); let server = if args.http_server { Some(create_http_server( @@ -245,6 +247,9 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { federate.cancel().await?; } + // Wait for outgoing apub sends to complete + ActivityChannel::close(outgoing_activities_task).await?; + Ok(()) }