Merge pull request #3960 from LemmyNet/add_federation_worker_index

Fixing high CPU usage on federation worker recheck + fix federation tests. Fixes #3958
This commit is contained in:
phiresky 2023-09-21 16:40:04 +02:00 committed by GitHub
commit 24c98a726a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 332 additions and 224 deletions

View file

@ -6,6 +6,8 @@ set -e
export RUST_BACKTRACE=1 export RUST_BACKTRACE=1
export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug"
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do
echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE"
psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE" psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE"
@ -34,30 +36,30 @@ echo "$PWD"
echo "start alpha" echo "start alpha"
LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_alpha.hjson \ LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_alpha.hjson \
LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \ LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \
target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 & target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 &
echo "start beta" echo "start beta"
LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_beta.hjson \ LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_beta.hjson \
LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \ LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \
target/lemmy_server >/tmp/lemmy_beta.out 2>&1 & target/lemmy_server >/tmp/lemmy_beta.out 2>&1 &
echo "start gamma" echo "start gamma"
LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_gamma.hjson \ LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_gamma.hjson \
LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \ LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \
target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 & target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 &
echo "start delta" echo "start delta"
# An instance with only an allowlist for beta # An instance with only an allowlist for beta
LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_delta.hjson \ LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_delta.hjson \
LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \ LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \
target/lemmy_server >/tmp/lemmy_delta.out 2>&1 & target/lemmy_server >/tmp/lemmy_delta.out 2>&1 &
echo "start epsilon" echo "start epsilon"
# An instance who has a blocklist, with lemmy-alpha blocked # An instance who has a blocklist, with lemmy-alpha blocked
LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_epsilon.hjson \ LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_epsilon.hjson \
LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \ LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \
target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 & target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 &
echo "wait for all instances to start" echo "wait for all instances to start"
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'lemmy-alpha:8541/api/v3/site')" != "200" ]]; do sleep 1; done while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'lemmy-alpha:8541/api/v3/site')" != "200" ]]; do sleep 1; done

View file

@ -33,21 +33,21 @@ import {
getUnreadCount, getUnreadCount,
waitUntil, waitUntil,
delay, delay,
waitForPost,
alphaUrl, alphaUrl,
} from "./shared"; } from "./shared";
import { CommentView } from "lemmy-js-client/dist/types/CommentView"; import { CommentView } from "lemmy-js-client/dist/types/CommentView";
import { CommunityView } from "lemmy-js-client";
import { LemmyHttp } from "lemmy-js-client"; import { LemmyHttp } from "lemmy-js-client";
let betaCommunity: CommunityView | undefined;
let postOnAlphaRes: PostResponse; let postOnAlphaRes: PostResponse;
beforeAll(async () => { beforeAll(async () => {
await setupLogins(); await setupLogins();
await unfollows(); await unfollows();
await followBeta(alpha); await Promise.all([followBeta(alpha), followBeta(gamma)]);
await followBeta(gamma); betaCommunity = (await resolveBetaCommunity(alpha)).community;
// wait for FOLLOW_ADDITIONS_RECHECK_DELAY
await delay(2000);
let betaCommunity = (await resolveBetaCommunity(alpha)).community;
if (betaCommunity) { if (betaCommunity) {
postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); postOnAlphaRes = await createPost(alpha, betaCommunity.community.id);
} }
@ -343,6 +343,8 @@ test("Federated comment like", async () => {
}); });
test("Reply to a comment from another instance, get notification", async () => { test("Reply to a comment from another instance, get notification", async () => {
await alpha.markAllAsRead();
let betaCommunity = (await resolveBetaCommunity(alpha)).community; let betaCommunity = (await resolveBetaCommunity(alpha)).community;
if (!betaCommunity) { if (!betaCommunity) {
throw "Missing beta community"; throw "Missing beta community";
@ -375,16 +377,17 @@ test("Reply to a comment from another instance, get notification", async () => {
expect(replyRes.comment_view.counts.score).toBe(1); expect(replyRes.comment_view.counts.score).toBe(1);
// Make sure that reply comment is seen on alpha // Make sure that reply comment is seen on alpha
// TODO not sure why, but a searchComment back to alpha, for the ap_id of betas let commentSearch = await waitUntil(
// comment, isn't working. () => resolveComment(alpha, replyRes.comment_view.comment),
// let searchAlpha = await searchComment(alpha, replyRes.comment); c => c.comment?.counts.score === 1,
);
let alphaComment = commentSearch.comment!;
let postComments = await waitUntil( let postComments = await waitUntil(
() => getComments(alpha, postOnAlphaRes.post_view.post.id), () => getComments(alpha, postOnAlphaRes.post_view.post.id),
pc => pc.comments.length >= 2, pc => pc.comments.length >= 2,
); );
// Note: this test fails when run twice and this count will differ // Note: this test fails when run twice and this count will differ
expect(postComments.comments.length).toBeGreaterThanOrEqual(2); expect(postComments.comments.length).toBeGreaterThanOrEqual(2);
let alphaComment = postComments.comments[0];
expect(alphaComment.comment.content).toBeDefined(); expect(alphaComment.comment.content).toBeDefined();
expect(getCommentParentId(alphaComment.comment)).toBe( expect(getCommentParentId(alphaComment.comment)).toBe(
@ -400,23 +403,29 @@ test("Reply to a comment from another instance, get notification", async () => {
() => getUnreadCount(alpha), () => getUnreadCount(alpha),
e => e.replies >= 1, e => e.replies >= 1,
); );
expect(alphaUnreadCountRes.replies).toBe(1); expect(alphaUnreadCountRes.replies).toBeGreaterThanOrEqual(1);
// check inbox of replies on alpha, fetching read/unread both // check inbox of replies on alpha, fetching read/unread both
let alphaRepliesRes = await getReplies(alpha); let alphaRepliesRes = await getReplies(alpha);
expect(alphaRepliesRes.replies.length).toBe(1); const alphaReply = alphaRepliesRes.replies.find(
expect(alphaRepliesRes.replies[0].comment.content).toBeDefined(); r => r.comment.id === alphaComment.comment.id,
expect(alphaRepliesRes.replies[0].community.local).toBe(false); );
expect(alphaRepliesRes.replies[0].creator.local).toBe(false); expect(alphaReply).toBeDefined();
expect(alphaRepliesRes.replies[0].counts.score).toBe(1); if (!alphaReply) throw Error();
expect(alphaReply.comment.content).toBeDefined();
expect(alphaReply.community.local).toBe(false);
expect(alphaReply.creator.local).toBe(false);
expect(alphaReply.counts.score).toBe(1);
// ToDo: interesting alphaRepliesRes.replies[0].comment_reply.id is 1, meaning? how did that come about? // ToDo: interesting alphaRepliesRes.replies[0].comment_reply.id is 1, meaning? how did that come about?
expect(alphaRepliesRes.replies[0].comment.id).toBe(alphaComment.comment.id); expect(alphaReply.comment.id).toBe(alphaComment.comment.id);
// this is a new notification, getReplies fetch was for read/unread both, confirm it is unread. // this is a new notification, getReplies fetch was for read/unread both, confirm it is unread.
expect(alphaRepliesRes.replies[0].comment_reply.read).toBe(false); expect(alphaReply.comment_reply.read).toBe(false);
assertCommentFederation(alphaRepliesRes.replies[0], replyRes.comment_view); assertCommentFederation(alphaReply, replyRes.comment_view);
}); });
test("Mention beta from alpha", async () => { test("Mention beta from alpha", async () => {
if (!betaCommunity) throw Error("no community");
const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id);
// Create a new branch, trunk-level comment branch, from alpha instance // Create a new branch, trunk-level comment branch, from alpha instance
let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id); let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id);
// Create a reply comment to previous comment, this has a mention in body // Create a reply comment to previous comment, this has a mention in body
@ -433,7 +442,7 @@ test("Mention beta from alpha", async () => {
expect(mentionRes.comment_view.counts.score).toBe(1); expect(mentionRes.comment_view.counts.score).toBe(1);
// get beta's localized copy of the alpha post // get beta's localized copy of the alpha post
let betaPost = (await resolvePost(beta, postOnAlphaRes.post_view.post)).post; let betaPost = await waitForPost(beta, postOnAlphaRes.post_view.post);
if (!betaPost) { if (!betaPost) {
throw "unable to locate post on beta"; throw "unable to locate post on beta";
} }
@ -443,9 +452,9 @@ test("Mention beta from alpha", async () => {
// Make sure that both new comments are seen on beta and have parent/child relationship // Make sure that both new comments are seen on beta and have parent/child relationship
let betaPostComments = await waitUntil( let betaPostComments = await waitUntil(
() => getComments(beta, betaPost!.post.id), () => getComments(beta, betaPost!.post.id),
c => c.comments[1].counts.score === 1, c => c.comments[1]?.counts.score === 1,
); );
expect(betaPostComments.comments.length).toBeGreaterThanOrEqual(2); expect(betaPostComments.comments.length).toEqual(2);
// the trunk-branch root comment will be older than the mention reply comment, so index 1 // the trunk-branch root comment will be older than the mention reply comment, so index 1
let betaRootComment = betaPostComments.comments[1]; let betaRootComment = betaPostComments.comments[1];
// the trunk-branch root comment should not have a parent // the trunk-branch root comment should not have a parent
@ -460,7 +469,10 @@ test("Mention beta from alpha", async () => {
expect(betaRootComment.counts.score).toBe(1); expect(betaRootComment.counts.score).toBe(1);
assertCommentFederation(betaRootComment, commentRes.comment_view); assertCommentFederation(betaRootComment, commentRes.comment_view);
let mentionsRes = await getMentions(beta); let mentionsRes = await waitUntil(
() => getMentions(beta),
m => !!m.mentions[0],
);
expect(mentionsRes.mentions[0].comment.content).toBeDefined(); expect(mentionsRes.mentions[0].comment.content).toBeDefined();
expect(mentionsRes.mentions[0].community.local).toBe(true); expect(mentionsRes.mentions[0].community.local).toBe(true);
expect(mentionsRes.mentions[0].creator.local).toBe(false); expect(mentionsRes.mentions[0].creator.local).toBe(false);
@ -492,7 +504,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t
expect(alphaPost.post_view.community.local).toBe(true); expect(alphaPost.post_view.community.local).toBe(true);
// Make sure gamma sees it // Make sure gamma sees it
let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post)).post; let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post))!.post;
if (!gammaPost) { if (!gammaPost) {
throw "Missing gamma post"; throw "Missing gamma post";
@ -514,7 +526,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t
// Make sure alpha sees it // Make sure alpha sees it
let alphaPostComments2 = await waitUntil( let alphaPostComments2 = await waitUntil(
() => getComments(alpha, alphaPost.post_view.post.id), () => getComments(alpha, alphaPost.post_view.post.id),
e => !!e.comments[0], e => e.comments[0]?.counts.score === 1,
); );
expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent);
expect(alphaPostComments2.comments[0].community.local).toBe(true); expect(alphaPostComments2.comments[0].community.local).toBe(true);
@ -560,21 +572,19 @@ test("Check that activity from another instance is sent to third instance", asyn
() => resolveBetaCommunity(gamma), () => resolveBetaCommunity(gamma),
c => c.community?.subscribed === "Subscribed", c => c.community?.subscribed === "Subscribed",
); );
// FOLLOW_ADDITIONS_RECHECK_DELAY
await delay(2000);
// Create a post on beta // Create a post on beta
let betaPost = await createPost(beta, 2); let betaPost = await createPost(beta, 2);
expect(betaPost.post_view.community.local).toBe(true); expect(betaPost.post_view.community.local).toBe(true);
// Make sure gamma and alpha see it // Make sure gamma and alpha see it
let gammaPost = (await resolvePost(gamma, betaPost.post_view.post)).post; let gammaPost = await waitForPost(gamma, betaPost.post_view.post);
if (!gammaPost) { if (!gammaPost) {
throw "Missing gamma post"; throw "Missing gamma post";
} }
expect(gammaPost.post).toBeDefined(); expect(gammaPost.post).toBeDefined();
let alphaPost = (await resolvePost(alpha, betaPost.post_view.post)).post; let alphaPost = await waitForPost(alpha, betaPost.post_view.post);
if (!alphaPost) { if (!alphaPost) {
throw "Missing alpha post"; throw "Missing alpha post";
} }
@ -596,7 +606,7 @@ test("Check that activity from another instance is sent to third instance", asyn
// Make sure alpha sees it // Make sure alpha sees it
let alphaPostComments2 = await waitUntil( let alphaPostComments2 = await waitUntil(
() => getComments(alpha, alphaPost!.post.id), () => getComments(alpha, alphaPost!.post.id),
e => !!e.comments[0], e => e.comments[0]?.counts.score === 1,
); );
expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent);
expect(alphaPostComments2.comments[0].community.local).toBe(false); expect(alphaPostComments2.comments[0].community.local).toBe(false);
@ -607,8 +617,7 @@ test("Check that activity from another instance is sent to third instance", asyn
commentRes.comment_view, commentRes.comment_view,
); );
await unfollowRemotes(alpha); await Promise.all([unfollowRemotes(alpha), unfollowRemotes(gamma)]);
await unfollowRemotes(gamma);
}); });
test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedded comments, A subs to B, B updates the lowest level comment, A fetches both the post and all the inreplyto comments for that post.", async () => { test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedded comments, A subs to B, B updates the lowest level comment, A fetches both the post and all the inreplyto comments for that post.", async () => {
@ -660,8 +669,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
expect(updateRes.comment_view.comment.content).toBe(updatedCommentContent); expect(updateRes.comment_view.comment.content).toBe(updatedCommentContent);
// Get the post from alpha // Get the post from alpha
let alphaPostB = (await resolvePost(alpha, postOnBetaRes.post_view.post)) let alphaPostB = await waitForPost(alpha, postOnBetaRes.post_view.post);
.post;
if (!alphaPostB) { if (!alphaPostB) {
throw "Missing alpha post B"; throw "Missing alpha post B";
} }
@ -671,7 +680,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
() => getComments(alpha, alphaPostB!.post.id), () => getComments(alpha, alphaPostB!.post.id),
c => c =>
c.comments[1]?.comment.content === c.comments[1]?.comment.content ===
parentCommentRes.comment_view.comment.content, parentCommentRes.comment_view.comment.content &&
c.comments[0]?.comment.content === updateRes.comment_view.comment.content,
); );
expect(alphaPost.post_view.post.name).toBeDefined(); expect(alphaPost.post_view.post.name).toBeDefined();
assertCommentFederation( assertCommentFederation(
@ -705,16 +715,17 @@ test("Report a comment", async () => {
throw "Missing alpha comment"; throw "Missing alpha comment";
} }
let alphaReport = ( const reason = randomString(10);
await reportComment(alpha, alphaComment.id, randomString(10)) let alphaReport = (await reportComment(alpha, alphaComment.id, reason))
).comment_report_view.comment_report; .comment_report_view.comment_report;
let betaReport = ( let betaReport = (await waitUntil(
await waitUntil( () =>
() => listCommentReports(beta), listCommentReports(beta).then(r =>
e => !!e.comment_reports[0], r.comment_reports.find(rep => rep.comment_report.reason === reason),
) ),
).comment_reports[0].comment_report; e => !!e,
))!.comment_report;
expect(betaReport).toBeDefined(); expect(betaReport).toBeDefined();
expect(betaReport.resolved).toBe(false); expect(betaReport.resolved).toBe(false);
expect(betaReport.original_comment_text).toBe( expect(betaReport.original_comment_text).toBe(

View file

@ -26,6 +26,7 @@ import {
blockInstance, blockInstance,
waitUntil, waitUntil,
delay, delay,
waitForPost,
alphaUrl, alphaUrl,
} from "./shared"; } from "./shared";
import { LemmyHttp } from "lemmy-js-client"; import { LemmyHttp } from "lemmy-js-client";
@ -89,12 +90,6 @@ test("Delete community", async () => {
// Make sure the follow response went through // Make sure the follow response went through
expect(follow.community_view.community.local).toBe(false); expect(follow.community_view.community.local).toBe(false);
await waitUntil(
() => resolveCommunity(alpha, searchShort),
g => g.community?.subscribed === "Subscribed",
);
// wait FOLLOW_ADDITIONS_RECHECK_DELAY
await delay(2000);
let deleteCommunityRes = await deleteCommunity( let deleteCommunityRes = await deleteCommunity(
beta, beta,
true, true,
@ -147,10 +142,6 @@ test("Remove community", async () => {
// Make sure the follow response went through // Make sure the follow response went through
expect(follow.community_view.community.local).toBe(false); expect(follow.community_view.community.local).toBe(false);
await waitUntil(
() => resolveCommunity(alpha, searchShort),
g => g.community?.subscribed === "Subscribed",
);
let removeCommunityRes = await removeCommunity( let removeCommunityRes = await removeCommunity(
beta, beta,
true, true,
@ -361,8 +352,8 @@ test("User blocks instance, communities are hidden", async () => {
expect(postRes.post_view.post.id).toBeDefined(); expect(postRes.post_view.post.id).toBeDefined();
// fetch post to alpha // fetch post to alpha
let alphaPost = await resolvePost(alpha, postRes.post_view.post); let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post!;
expect(alphaPost.post?.post).toBeDefined(); expect(alphaPost.post).toBeDefined();
// post should be included in listing // post should be included in listing
let listing = await getPosts(alpha, "All"); let listing = await getPosts(alpha, "All");
@ -370,7 +361,7 @@ test("User blocks instance, communities are hidden", async () => {
expect(listing_ids).toContain(postRes.post_view.post.ap_id); expect(listing_ids).toContain(postRes.post_view.post.ap_id);
// block the beta instance // block the beta instance
await blockInstance(alpha, alphaPost.post!.community.instance_id, true); await blockInstance(alpha, alphaPost.community.instance_id, true);
// after blocking, post should not be in listing // after blocking, post should not be in listing
let listing2 = await getPosts(alpha, "All"); let listing2 = await getPosts(alpha, "All");
@ -378,7 +369,7 @@ test("User blocks instance, communities are hidden", async () => {
expect(listing_ids2.indexOf(postRes.post_view.post.ap_id)).toBe(-1); expect(listing_ids2.indexOf(postRes.post_view.post.ap_id)).toBe(-1);
// unblock instance again // unblock instance again
await blockInstance(alpha, alphaPost.post!.community.instance_id, false); await blockInstance(alpha, alphaPost.community.instance_id, false);
// post should be included in listing // post should be included in listing
let listing3 = await getPosts(alpha, "All"); let listing3 = await getPosts(alpha, "All");

View file

@ -34,7 +34,7 @@ import {
unfollows, unfollows,
resolveCommunity, resolveCommunity,
waitUntil, waitUntil,
delay, waitForPost,
alphaUrl, alphaUrl,
} from "./shared"; } from "./shared";
import { PostView } from "lemmy-js-client/dist/types/PostView"; import { PostView } from "lemmy-js-client/dist/types/PostView";
@ -83,11 +83,11 @@ test("Create a post", async () => {
expect(postRes.post_view.counts.score).toBe(1); expect(postRes.post_view.counts.score).toBe(1);
// Make sure that post is liked on beta // Make sure that post is liked on beta
const res = await waitUntil( const betaPost = await waitForPost(
() => resolvePost(beta, postRes.post_view.post), beta,
res => res.post?.counts.score === 1, postRes.post_view.post,
res => res?.counts.score === 1,
); );
let betaPost = res.post;
expect(betaPost).toBeDefined(); expect(betaPost).toBeDefined();
expect(betaPost?.community.local).toBe(true); expect(betaPost?.community.local).toBe(true);
@ -123,12 +123,12 @@ test("Unlike a post", async () => {
expect(unlike2.post_view.counts.score).toBe(0); expect(unlike2.post_view.counts.score).toBe(0);
// Make sure that post is unliked on beta // Make sure that post is unliked on beta
const betaPost = ( const betaPost = await waitForPost(
await waitUntil( beta,
() => resolvePost(beta, postRes.post_view.post), postRes.post_view.post,
b => b.post?.counts.score === 0, post => post?.counts.score === 0,
) );
).post;
expect(betaPost).toBeDefined(); expect(betaPost).toBeDefined();
expect(betaPost?.community.local).toBe(true); expect(betaPost?.community.local).toBe(true);
expect(betaPost?.creator.local).toBe(false); expect(betaPost?.creator.local).toBe(false);
@ -141,26 +141,16 @@ test("Update a post", async () => {
throw "Missing beta community"; throw "Missing beta community";
} }
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
await waitUntil( await waitForPost(beta, postRes.post_view.post);
() => resolvePost(beta, postRes.post_view.post),
res => !!res.post,
);
let updatedName = "A jest test federated post, updated"; let updatedName = "A jest test federated post, updated";
let updatedPost = await editPost(alpha, postRes.post_view.post); let updatedPost = await editPost(alpha, postRes.post_view.post);
await waitUntil(
() => resolvePost(beta, postRes.post_view.post),
res => res.post?.post.name === updatedName,
);
expect(updatedPost.post_view.post.name).toBe(updatedName); expect(updatedPost.post_view.post.name).toBe(updatedName);
expect(updatedPost.post_view.community.local).toBe(false); expect(updatedPost.post_view.community.local).toBe(false);
expect(updatedPost.post_view.creator.local).toBe(true); expect(updatedPost.post_view.creator.local).toBe(true);
// Make sure that post is updated on beta // Make sure that post is updated on beta
let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; let betaPost = await waitForPost(beta, updatedPost.post_view.post);
if (!betaPost) {
throw "Missing beta post";
}
expect(betaPost.community.local).toBe(true); expect(betaPost.community.local).toBe(true);
expect(betaPost.creator.local).toBe(false); expect(betaPost.creator.local).toBe(false);
expect(betaPost.post.name).toBe(updatedName); expect(betaPost.post.name).toBe(updatedName);
@ -178,7 +168,7 @@ test("Sticky a post", async () => {
} }
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; let betaPost1 = await waitForPost(beta, postRes.post_view.post);
if (!betaPost1) { if (!betaPost1) {
throw "Missing beta post1"; throw "Missing beta post1";
} }
@ -221,30 +211,19 @@ test("Lock a post", async () => {
() => resolveBetaCommunity(alpha), () => resolveBetaCommunity(alpha),
c => c.community?.subscribed === "Subscribed", c => c.community?.subscribed === "Subscribed",
); );
// wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently)
await delay(2_000);
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
// wait for federation let betaPost1 = await waitForPost(beta, postRes.post_view.post);
await waitUntil(
() => searchPostLocal(beta, postRes.post_view.post),
res => !!res.posts[0],
);
// Lock the post // Lock the post
let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post;
if (!betaPost1) {
throw "Missing beta post1";
}
let lockedPostRes = await lockPost(beta, true, betaPost1.post); let lockedPostRes = await lockPost(beta, true, betaPost1.post);
expect(lockedPostRes.post_view.post.locked).toBe(true); expect(lockedPostRes.post_view.post.locked).toBe(true);
// Make sure that post is locked on alpha // Make sure that post is locked on alpha
let searchAlpha = await waitUntil( let alphaPost1 = await waitForPost(
() => searchPostLocal(alpha, postRes.post_view.post), alpha,
res => res.posts[0]?.post.locked, postRes.post_view.post,
post => !!post && post.post.locked,
); );
let alphaPost1 = searchAlpha.posts[0];
expect(alphaPost1.post.locked).toBe(true);
// Try to make a new comment there, on alpha // Try to make a new comment there, on alpha
await expect(createComment(alpha, alphaPost1.post.id)).rejects.toBe("locked"); await expect(createComment(alpha, alphaPost1.post.id)).rejects.toBe("locked");
@ -254,11 +233,11 @@ test("Lock a post", async () => {
expect(unlockedPost.post_view.post.locked).toBe(false); expect(unlockedPost.post_view.post.locked).toBe(false);
// Make sure that post is unlocked on alpha // Make sure that post is unlocked on alpha
let searchAlpha2 = await waitUntil( let alphaPost2 = await waitForPost(
() => searchPostLocal(alpha, postRes.post_view.post), alpha,
res => !res.posts[0]?.post.locked, postRes.post_view.post,
post => !!post && !post.post.locked,
); );
let alphaPost2 = searchAlpha2.posts[0];
expect(alphaPost2.community.local).toBe(false); expect(alphaPost2.community.local).toBe(false);
expect(alphaPost2.creator.local).toBe(true); expect(alphaPost2.creator.local).toBe(true);
expect(alphaPost2.post.locked).toBe(false); expect(alphaPost2.post.locked).toBe(false);
@ -275,6 +254,7 @@ test("Delete a post", async () => {
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
expect(postRes.post_view.post).toBeDefined(); expect(postRes.post_view.post).toBeDefined();
await waitForPost(beta, postRes.post_view.post);
let deletedPost = await deletePost(alpha, true, postRes.post_view.post); let deletedPost = await deletePost(alpha, true, postRes.post_view.post);
expect(deletedPost.post_view.post.deleted).toBe(true); expect(deletedPost.post_view.post.deleted).toBe(true);
@ -282,16 +262,18 @@ test("Delete a post", async () => {
// Make sure lemmy beta sees post is deleted // Make sure lemmy beta sees post is deleted
// This will be undefined because of the tombstone // This will be undefined because of the tombstone
await expect(resolvePost(beta, postRes.post_view.post)).rejects.toBe( await waitForPost(beta, postRes.post_view.post, p => !p || p.post.deleted);
"couldnt_find_object",
);
// Undelete // Undelete
let undeletedPost = await deletePost(alpha, false, postRes.post_view.post); let undeletedPost = await deletePost(alpha, false, postRes.post_view.post);
expect(undeletedPost.post_view.post.deleted).toBe(false);
// Make sure lemmy beta sees post is undeleted // Make sure lemmy beta sees post is undeleted
let betaPost2 = (await resolvePost(beta, postRes.post_view.post)).post; let betaPost2 = await waitForPost(
beta,
postRes.post_view.post,
p => !!p && !p.post.deleted,
);
if (!betaPost2) { if (!betaPost2) {
throw "Missing beta post 2"; throw "Missing beta post 2";
} }
@ -350,11 +332,7 @@ test("Remove a post from admin and community on same instance", async () => {
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
expect(postRes.post_view.post).toBeDefined(); expect(postRes.post_view.post).toBeDefined();
// Get the id for beta // Get the id for beta
let searchBeta = await waitUntil( let betaPost = await waitForPost(beta, postRes.post_view.post);
() => searchPostLocal(beta, postRes.post_view.post),
res => !!res.posts[0],
);
let betaPost = searchBeta.posts[0];
expect(betaPost).toBeDefined(); expect(betaPost).toBeDefined();
// The beta admin removes it (the community lives on beta) // The beta admin removes it (the community lives on beta)
@ -362,18 +340,25 @@ test("Remove a post from admin and community on same instance", async () => {
expect(removePostRes.post_view.post.removed).toBe(true); expect(removePostRes.post_view.post.removed).toBe(true);
// Make sure lemmy alpha sees post is removed // Make sure lemmy alpha sees post is removed
// let alphaPost = await getPost(alpha, postRes.post_view.post.id); let alphaPost = await waitUntil(
// expect(alphaPost.post_view.post.removed).toBe(true); // TODO this shouldn't be commented () => getPost(alpha, postRes.post_view.post.id),
// assertPostFederation(alphaPost.post_view, removePostRes.post_view); p => p?.post_view.post.removed ?? false,
);
expect(alphaPost.post_view?.post.removed).toBe(true);
assertPostFederation(alphaPost.post_view, removePostRes.post_view);
// Undelete // Undelete
let undeletedPost = await removePost(beta, false, betaPost.post); let undeletedPost = await removePost(beta, false, betaPost.post);
expect(undeletedPost.post_view.post.removed).toBe(false); expect(undeletedPost.post_view.post.removed).toBe(false);
// Make sure lemmy alpha sees post is undeleted // Make sure lemmy alpha sees post is undeleted
let alphaPost2 = await getPost(alpha, postRes.post_view.post.id); let alphaPost2 = await waitForPost(
expect(alphaPost2.post_view.post.removed).toBe(false); alpha,
assertPostFederation(alphaPost2.post_view, undeletedPost.post_view); postRes.post_view.post,
p => !!p && !p.post.removed,
);
expect(alphaPost2.post.removed).toBe(false);
assertPostFederation(alphaPost2, undeletedPost.post_view);
await unfollowRemotes(alpha); await unfollowRemotes(alpha);
}); });
@ -385,7 +370,7 @@ test("Search for a post", async () => {
let postRes = await createPost(alpha, betaCommunity.community.id); let postRes = await createPost(alpha, betaCommunity.community.id);
expect(postRes.post_view.post).toBeDefined(); expect(postRes.post_view.post).toBeDefined();
let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; let betaPost = await waitForPost(beta, postRes.post_view.post);
expect(betaPost?.post.name).toBeDefined(); expect(betaPost?.post.name).toBeDefined();
}); });
@ -413,11 +398,7 @@ test("Enforce site ban for federated user", async () => {
// alpha makes post in beta community, it federates to beta instance // alpha makes post in beta community, it federates to beta instance
let postRes1 = await createPost(alpha_user, betaCommunity.community.id); let postRes1 = await createPost(alpha_user, betaCommunity.community.id);
let searchBeta1 = await waitUntil( let searchBeta1 = await waitForPost(beta, postRes1.post_view.post);
() => searchPostLocal(beta, postRes1.post_view.post),
res => !!res.posts[0],
);
expect(searchBeta1.posts[0]).toBeDefined();
// ban alpha from its instance // ban alpha from its instance
let banAlpha = await banPersonFromSite( let banAlpha = await banPersonFromSite(
@ -436,8 +417,10 @@ test("Enforce site ban for federated user", async () => {
expect(alphaUserOnBeta1.person?.person.banned).toBe(true); expect(alphaUserOnBeta1.person?.person.banned).toBe(true);
// existing alpha post should be removed on beta // existing alpha post should be removed on beta
let searchBeta2 = await getPost(beta, searchBeta1.posts[0].post.id); let searchBeta2 = await waitUntil(
expect(searchBeta2.post_view.post.removed).toBe(true); () => getPost(beta, searchBeta1.post.id),
s => s.post_view.post.removed,
);
// Unban alpha // Unban alpha
let unBanAlpha = await banPersonFromSite( let unBanAlpha = await banPersonFromSite(
@ -450,11 +433,7 @@ test("Enforce site ban for federated user", async () => {
// alpha makes new post in beta community, it federates // alpha makes new post in beta community, it federates
let postRes2 = await createPost(alpha_user, betaCommunity.community.id); let postRes2 = await createPost(alpha_user, betaCommunity.community.id);
let searchBeta3 = await waitUntil( let searchBeta3 = await waitForPost(beta, postRes2.post_view.post);
() => searchPostLocal(beta, postRes2.post_view.post),
e => !!e.posts[0],
);
expect(searchBeta3.posts[0]).toBeDefined();
let alphaUserOnBeta2 = await resolvePerson(beta, alphaUserActorId!); let alphaUserOnBeta2 = await resolvePerson(beta, alphaUserActorId!);
expect(alphaUserOnBeta2.person?.person.banned).toBe(false); expect(alphaUserOnBeta2.person?.person.banned).toBe(false);
@ -544,12 +523,16 @@ test("Report a post", async () => {
await reportPost(alpha, alphaPost.post.id, randomString(10)) await reportPost(alpha, alphaPost.post.id, randomString(10))
).post_report_view.post_report; ).post_report_view.post_report;
let betaReport = ( let betaReport = (await waitUntil(
await waitUntil( () =>
() => listPostReports(beta), listPostReports(beta).then(p =>
res => !!res.post_reports[0], p.post_reports.find(
) r =>
).post_reports[0].post_report; r.post_report.original_post_name === alphaReport.original_post_name,
),
),
res => !!res,
))!.post_report;
expect(betaReport).toBeDefined(); expect(betaReport).toBeDefined();
expect(betaReport.resolved).toBe(false); expect(betaReport.resolved).toBe(false);
expect(betaReport.original_post_name).toBe(alphaReport.original_post_name); expect(betaReport.original_post_name).toBe(alphaReport.original_post_name);

View file

@ -6,6 +6,7 @@ import {
GetUnreadCountResponse, GetUnreadCountResponse,
InstanceId, InstanceId,
LemmyHttp, LemmyHttp,
PostView,
} from "lemmy-js-client"; } from "lemmy-js-client";
import { CreatePost } from "lemmy-js-client/dist/types/CreatePost"; import { CreatePost } from "lemmy-js-client/dist/types/CreatePost";
import { DeletePost } from "lemmy-js-client/dist/types/DeletePost"; import { DeletePost } from "lemmy-js-client/dist/types/DeletePost";
@ -181,7 +182,7 @@ export async function setupLogins() {
// otherwise the first few federated events may be missed // otherwise the first few federated events may be missed
// (because last_successful_id is set to current id when federation to an instance is first started) // (because last_successful_id is set to current id when federation to an instance is first started)
// only needed the first time so do in this try // only needed the first time so do in this try
await delay(6_000); await delay(10_000);
} catch (_) { } catch (_) {
console.log("Communities already exist"); console.log("Communities already exist");
} }
@ -288,6 +289,18 @@ export async function searchPostLocal(
return api.search(form); return api.search(form);
} }
/// wait for a post to appear locally without pulling it
export async function waitForPost(
api: LemmyHttp,
post: Post,
checker: (t: PostView | undefined) => boolean = p => !!p,
) {
return waitUntil<PostView>(
() => searchPostLocal(api, post).then(p => p.posts[0]),
checker,
);
}
export async function getPost( export async function getPost(
api: LemmyHttp, api: LemmyHttp,
post_id: number, post_id: number,
@ -405,7 +418,14 @@ export async function followCommunity(
community_id, community_id,
follow, follow,
}; };
return api.followCommunity(form); const res = await api.followCommunity(form);
await waitUntil(
() => resolveCommunity(api, res.community_view.community.actor_id),
g => g.community?.subscribed === (follow ? "Subscribed" : "NotSubscribed"),
);
// wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently)
await delay(2000);
return res;
} }
export async function likePost( export async function likePost(
@ -686,9 +706,9 @@ export async function unfollowRemotes(
let site = await getSite(api); let site = await getSite(api);
let remoteFollowed = let remoteFollowed =
site.my_user?.follows.filter(c => c.community.local == false) ?? []; site.my_user?.follows.filter(c => c.community.local == false) ?? [];
for (let cu of remoteFollowed) { await Promise.all(
await followCommunity(api, false, cu.community.id); remoteFollowed.map(cu => followCommunity(api, false, cu.community.id)),
} );
let siteRes = await getSite(api); let siteRes = await getSite(api);
return siteRes; return siteRes;
} }
@ -787,10 +807,12 @@ export function randomString(length: number): string {
} }
export async function unfollows() { export async function unfollows() {
await unfollowRemotes(alpha); await Promise.all([
await unfollowRemotes(gamma); unfollowRemotes(alpha),
await unfollowRemotes(delta); unfollowRemotes(gamma),
await unfollowRemotes(epsilon); unfollowRemotes(delta),
unfollowRemotes(epsilon),
]);
} }
export function getCommentParentId(comment: Comment): number | undefined { export function getCommentParentId(comment: Comment): number | undefined {
@ -809,14 +831,18 @@ export async function waitUntil<T>(
fetcher: () => Promise<T>, fetcher: () => Promise<T>,
checker: (t: T) => boolean, checker: (t: T) => boolean,
retries = 10, retries = 10,
delaySeconds = 2, delaySeconds = [0.2, 0.5, 1, 2, 3],
) { ) {
let retry = 0; let retry = 0;
let result;
while (retry++ < retries) { while (retry++ < retries) {
const result = await fetcher(); result = await fetcher();
if (checker(result)) return result; if (checker(result)) return result;
await delay(delaySeconds * 1000); await delay(
delaySeconds[Math.min(retry - 1, delaySeconds.length - 1)] * 1000,
);
} }
console.error("result", result);
throw Error( throw Error(
`Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`,
); );

View file

@ -18,7 +18,15 @@ use lemmy_db_schema::{
}; };
use lemmy_db_views::structs::PrivateMessageView; use lemmy_db_views::structs::PrivateMessageView;
use lemmy_utils::error::LemmyResult; use lemmy_utils::error::LemmyResult;
use once_cell::sync::OnceCell; use once_cell::sync::{Lazy, OnceCell};
use tokio::{
sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
Mutex,
},
task::JoinHandle,
};
use url::Url; use url::Url;
type MatchOutgoingActivitiesBoxed = type MatchOutgoingActivitiesBoxed =
@ -54,16 +62,45 @@ pub enum SendActivityData {
CreateReport(Url, Person, Community, String), CreateReport(Url, Person, Community, String),
} }
pub struct ActivityChannel; // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
let weak_sender = sender.downgrade();
ActivityChannel {
weak_sender,
receiver: Mutex::new(receiver),
keepalive_sender: Mutex::new(Some(sender)),
}
});
pub struct ActivityChannel {
weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
}
impl ActivityChannel { impl ActivityChannel {
pub async fn retrieve_activity() -> Option<SendActivityData> {
let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
lock.recv().await
}
pub async fn submit_activity( pub async fn submit_activity(
data: SendActivityData, data: SendActivityData,
context: &Data<LemmyContext>, _context: &Data<LemmyContext>,
) -> LemmyResult<()> { ) -> LemmyResult<()> {
MATCH_OUTGOING_ACTIVITIES // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
.get() // not sure which way is more efficient
.expect("retrieve function pointer")(data, context) if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
.await sender.send(data)?;
}
Ok(())
}
pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
outgoing_activities_task.await??;
Ok(())
} }
} }

View file

@ -175,7 +175,7 @@ pub async fn create_post(
mark_post_as_read(person_id, post_id, &mut context.pool()).await?; mark_post_as_read(person_id, post_id, &mut context.pool()).await?;
if let Some(url) = updated_post.url.clone() { if let Some(url) = updated_post.url.clone() {
let task = async move { spawn_try_task(async move {
let mut webmention = let mut webmention =
Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?; Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
webmention.set_checked(true); webmention.set_checked(true);
@ -188,8 +188,7 @@ pub async fn create_post(
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention),
} }
}; });
spawn_try_task(task);
}; };
build_post_response(&context, community_id, person_id, post_id).await build_post_response(&context, community_id, person_id, post_id).await

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
activities::{ activities::{
generate_activity_id, generate_activity_id,
generate_announce_activity_id,
send_lemmy_activity, send_lemmy_activity,
verify_is_public, verify_is_public,
verify_person_in_community, verify_person_in_community,
@ -75,16 +76,20 @@ impl AnnounceActivity {
community: &ApubCommunity, community: &ApubCommunity,
context: &Data<LemmyContext>, context: &Data<LemmyContext>,
) -> Result<AnnounceActivity, LemmyError> { ) -> Result<AnnounceActivity, LemmyError> {
let inner_kind = object
.other
.get("type")
.and_then(serde_json::Value::as_str)
.unwrap_or("other");
let id =
generate_announce_activity_id(inner_kind, &context.settings().get_protocol_and_hostname())?;
Ok(AnnounceActivity { Ok(AnnounceActivity {
actor: community.id().into(), actor: community.id().into(),
to: vec![public()], to: vec![public()],
object: IdOrNestedObject::NestedObject(object), object: IdOrNestedObject::NestedObject(object),
cc: vec![community.followers_url.clone().into()], cc: vec![community.followers_url.clone().into()],
kind: AnnounceType::Announce, kind: AnnounceType::Announce,
id: generate_activity_id( id,
&AnnounceType::Announce,
&context.settings().get_protocol_and_hostname(),
)?,
}) })
} }

View file

@ -28,12 +28,15 @@ use crate::{
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
kinds::public, kinds::{activity::AnnounceType, public},
protocol::context::WithContext, protocol::context::WithContext,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use anyhow::anyhow; use anyhow::anyhow;
use lemmy_api_common::{context::LemmyContext, send_activity::SendActivityData}; use lemmy_api_common::{
context::LemmyContext,
send_activity::{ActivityChannel, SendActivityData},
};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::CommunityId, newtypes::CommunityId,
source::{ source::{
@ -42,10 +45,7 @@ use lemmy_db_schema::{
}, },
}; };
use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::{ use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult};
error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult},
spawn_try_task,
};
use serde::Serialize; use serde::Serialize;
use std::{ops::Deref, time::Duration}; use std::{ops::Deref, time::Duration};
use tracing::info; use tracing::info;
@ -181,6 +181,21 @@ where
Url::parse(&id) Url::parse(&id)
} }
/// like generate_activity_id but also add the inner kind for easier debugging
fn generate_announce_activity_id(
inner_kind: &str,
protocol_and_hostname: &str,
) -> Result<Url, ParseError> {
let id = format!(
"{}/activities/{}/{}/{}",
protocol_and_hostname,
AnnounceType::Announce.to_string().to_lowercase(),
inner_kind.to_lowercase(),
Uuid::new_v4()
);
Url::parse(&id)
}
pub(crate) trait GetActorType { pub(crate) trait GetActorType {
fn actor_type(&self) -> ActorType; fn actor_type(&self) -> ActorType;
} }
@ -198,12 +213,12 @@ where
ActorT: Actor + GetActorType, ActorT: Actor + GetActorType,
Activity: ActivityHandler<Error = LemmyError>, Activity: ActivityHandler<Error = LemmyError>,
{ {
info!("Sending activity {}", activity.id().to_string()); info!("Saving outgoing activity to queue {}", activity.id());
let activity = WithContext::new(activity, CONTEXT.deref().clone()); let activity = WithContext::new(activity, CONTEXT.deref().clone());
let form = SentActivityForm { let form = SentActivityForm {
ap_id: activity.id().clone().into(), ap_id: activity.id().clone().into(),
data: serde_json::to_value(activity.clone())?, data: serde_json::to_value(activity)?,
sensitive, sensitive,
send_inboxes: send_targets send_inboxes: send_targets
.inboxes .inboxes
@ -220,6 +235,13 @@ where
Ok(()) Ok(())
} }
pub async fn handle_outgoing_activities(context: Data<LemmyContext>) -> LemmyResult<()> {
while let Some(data) = ActivityChannel::retrieve_activity().await {
match_outgoing_activities(data, &context.reset_request_count()).await?
}
Ok(())
}
pub async fn match_outgoing_activities( pub async fn match_outgoing_activities(
data: SendActivityData, data: SendActivityData,
context: &Data<LemmyContext>, context: &Data<LemmyContext>,
@ -324,6 +346,6 @@ pub async fn match_outgoing_activities(
} }
} }
}; };
spawn_try_task(fed_task); fed_task.await?;
Ok(()) Ok(())
} }

View file

@ -1,8 +1,5 @@
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use diesel::{ use diesel::prelude::*;
prelude::*,
sql_types::{Bool, Int8},
};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use lemmy_apub::{ use lemmy_apub::{
activity_lists::SharedInboxActivities, activity_lists::SharedInboxActivities,
@ -31,6 +28,26 @@ use std::{
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Recheck for new federation work every n seconds.
///
/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch,
/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table.
/// This delay is only applied if no federated activity happens during sending activities of the last batch.
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_millis(100)
} else {
Duration::from_secs(30)
}
});
pub struct CancellableTask<R: Send + 'static> { pub struct CancellableTask<R: Send + 'static> {
f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>, f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>,
ended: Arc<RwLock<bool>>, ended: Arc<RwLock<bool>>,
@ -162,22 +179,20 @@ pub(crate) async fn get_activity_cached(
pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> { pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| { static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
Cache::builder() Cache::builder()
.time_to_live(Duration::from_secs(1)) .time_to_live(if *LEMMY_TEST_FAST_FEDERATION {
*WORK_FINISHED_RECHECK_DELAY
} else {
Duration::from_secs(1)
})
.build() .build()
}); });
CACHE CACHE
.try_get_with((), async { .try_get_with((), async {
use diesel::dsl::max;
use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity};
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
let seq: Sequence = let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?;
diesel::sql_query("select last_value, is_called from sent_activity_id_seq") let latest_id = seq.unwrap_or(0);
.get_result(conn)
.await?;
let latest_id = if seq.is_called {
seq.last_value as ActivityId
} else {
// if a PG sequence has never been used, last_value will actually be next_value
(seq.last_value - 1) as ActivityId
};
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
}) })
.await .await
@ -188,11 +203,3 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration { pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
} }
#[derive(QueryableByName)]
struct Sequence {
#[diesel(sql_type = Int8)]
last_value: i64, // this value is bigint for some reason even if sequence is int4
#[diesel(sql_type = Bool)]
is_called: bool,
}

View file

@ -1,6 +1,13 @@
use crate::{ use crate::{
federation_queue_state::FederationQueueState, federation_queue_state::FederationQueueState,
util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, util::{
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
retry_sleep_duration,
LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY,
},
}; };
use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use activitypub_federation::{activity_sending::SendActivityTask, config::Data};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -22,20 +29,27 @@ use std::{
}; };
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt)
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// recheck for new federation work every n seconds /// interval with which new additions to community_followers are queried.
#[cfg(debug_assertions)] ///
static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); /// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
#[cfg(not(debug_assertions))] /// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); /// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
#[cfg(debug_assertions)] /// (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = Lazy::new(|| {
Lazy::new(|| chrono::Duration::seconds(1)); if *LEMMY_TEST_FAST_FEDERATION {
#[cfg(not(debug_assertions))] chrono::Duration::seconds(1)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = } else {
Lazy::new(|| chrono::Duration::minutes(1)); chrono::Duration::minutes(2)
}
});
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community.
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::Duration> = static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::Duration> =
Lazy::new(|| chrono::Duration::hours(1)); Lazy::new(|| chrono::Duration::hours(1));
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
@ -121,6 +135,7 @@ impl InstanceWorker {
} }
Ok(()) Ok(())
} }
/// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
let latest_id = get_latest_activity_id(pool).await?; let latest_id = get_latest_activity_id(pool).await?;
if self.state.last_successful_id == -1 { if self.state.last_successful_id == -1 {
@ -134,7 +149,7 @@ impl InstanceWorker {
if id == latest_id { if id == latest_id {
// no more work to be done, wait before rechecking // no more work to be done, wait before rechecking
tokio::select! { tokio::select! {
() = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
() = self.stop.cancelled() => {} () = self.stop.cancelled() => {}
} }
return Ok(()); return Ok(());
@ -254,7 +269,8 @@ impl InstanceWorker {
.send_inboxes .send_inboxes
.iter() .iter()
.filter_map(std::option::Option::as_ref) .filter_map(std::option::Option::as_ref)
.filter_map(|u| (u.domain() == Some(&self.instance.domain)).then(|| u.inner().clone())), .filter(|&u| (u.domain() == Some(&self.instance.domain)))
.map(|u| u.inner().clone()),
); );
Ok(inbox_urls) Ok(inbox_urls)
} }
@ -263,7 +279,7 @@ impl InstanceWorker {
if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
// process removals every hour // process removals every hour
(self.followed_communities, self.last_full_communities_fetch) = self (self.followed_communities, self.last_full_communities_fetch) = self
.get_communities(pool, self.instance.id, self.last_full_communities_fetch) .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0))
.await?; .await?;
self.last_incremental_communities_fetch = self.last_full_communities_fetch; self.last_incremental_communities_fetch = self.last_full_communities_fetch;
} }
@ -289,13 +305,13 @@ impl InstanceWorker {
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> { ) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
let new_last_fetch = Utc::now(); // update to time before fetch to ensure overlap let new_last_fetch = Utc::now() - chrono::Duration::seconds(10); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact
Ok(( Ok((
CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch)
.await? .await?
.into_iter() .into_iter()
.fold(HashMap::new(), |mut map, (c, u)| { .fold(HashMap::new(), |mut map, (c, u)| {
map.entry(c).or_insert_with(HashSet::new).insert(u.into()); map.entry(c).or_default().insert(u.into());
map map
}), }),
new_last_fetch, new_last_fetch,

View file

@ -0,0 +1,2 @@
DROP INDEX idx_person_local_instance;

View file

@ -0,0 +1,2 @@
CREATE INDEX idx_person_local_instance ON person (local DESC, instance_id);

View file

@ -28,14 +28,14 @@ use lemmy_api_common::{
context::LemmyContext, context::LemmyContext,
lemmy_db_views::structs::SiteView, lemmy_db_views::structs::SiteView,
request::build_user_agent, request::build_user_agent,
send_activity::MATCH_OUTGOING_ACTIVITIES, send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
utils::{ utils::{
check_private_instance_and_federation_enabled, check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config, local_site_rate_limit_to_rate_limit_config,
}, },
}; };
use lemmy_apub::{ use lemmy_apub::{
activities::match_outgoing_activities, activities::{handle_outgoing_activities, match_outgoing_activities},
VerifyUrlData, VerifyUrlData,
FEDERATION_HTTP_FETCH_LIMIT, FEDERATION_HTTP_FETCH_LIMIT,
}; };
@ -203,6 +203,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> {
Box::pin(match_outgoing_activities(d, c)) Box::pin(match_outgoing_activities(d, c))
})) }))
.expect("set function pointer"); .expect("set function pointer");
let request_data = federation_config.to_request_data();
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
let server = if args.http_server { let server = if args.http_server {
Some(create_http_server( Some(create_http_server(
@ -245,6 +247,9 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> {
federate.cancel().await?; federate.cancel().await?;
} }
// Wait for outgoing apub sends to complete
ActivityChannel::close(outgoing_activities_task).await?;
Ok(()) Ok(())
} }