* Read community follower count from home instance (fixes #1440) * fmt * prettier * fix tests * fmt * rename fn * fmt * Run prettier * increase timeout * ci --------- Co-authored-by: Dessalines <dessalines@users.noreply.github.com> Co-authored-by: Dessalines <tyhou13@gmx.com>
This commit is contained in:
parent
332e698336
commit
56322c75f0
11 changed files with 223 additions and 41 deletions
|
@ -27,8 +27,11 @@ import {
|
||||||
waitUntil,
|
waitUntil,
|
||||||
delay,
|
delay,
|
||||||
alphaUrl,
|
alphaUrl,
|
||||||
|
delta,
|
||||||
betaAllowedInstances,
|
betaAllowedInstances,
|
||||||
searchPostLocal,
|
searchPostLocal,
|
||||||
|
resolveBetaCommunity,
|
||||||
|
longDelay,
|
||||||
} from "./shared";
|
} from "./shared";
|
||||||
import { EditSite, LemmyHttp } from "lemmy-js-client";
|
import { EditSite, LemmyHttp } from "lemmy-js-client";
|
||||||
|
|
||||||
|
@ -378,6 +381,59 @@ test("User blocks instance, communities are hidden", async () => {
|
||||||
expect(listing_ids3).toContain(postRes.post_view.post.ap_id);
|
expect(listing_ids3).toContain(postRes.post_view.post.ap_id);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("Community follower count is federated", async () => {
|
||||||
|
// Follow the beta community from alpha
|
||||||
|
let resolved = await resolveBetaCommunity(alpha);
|
||||||
|
if (!resolved.community) {
|
||||||
|
throw "Missing beta community";
|
||||||
|
}
|
||||||
|
|
||||||
|
await followCommunity(alpha, true, resolved.community.community.id);
|
||||||
|
let followed = (
|
||||||
|
await waitUntil(
|
||||||
|
() => resolveBetaCommunity(alpha),
|
||||||
|
c => c.community?.subscribed === "Subscribed",
|
||||||
|
)
|
||||||
|
).community;
|
||||||
|
|
||||||
|
// Make sure there is 1 subscriber
|
||||||
|
expect(followed?.counts.subscribers).toBe(1);
|
||||||
|
|
||||||
|
// Follow the community from gamma
|
||||||
|
resolved = await resolveBetaCommunity(gamma);
|
||||||
|
if (!resolved.community) {
|
||||||
|
throw "Missing beta community";
|
||||||
|
}
|
||||||
|
|
||||||
|
await followCommunity(gamma, true, resolved.community.community.id);
|
||||||
|
followed = (
|
||||||
|
await waitUntil(
|
||||||
|
() => resolveBetaCommunity(gamma),
|
||||||
|
c => c.community?.subscribed === "Subscribed",
|
||||||
|
)
|
||||||
|
).community;
|
||||||
|
|
||||||
|
// Make sure there are 2 subscribers
|
||||||
|
expect(followed?.counts?.subscribers).toBe(2);
|
||||||
|
|
||||||
|
// Follow the community from delta
|
||||||
|
resolved = await resolveBetaCommunity(delta);
|
||||||
|
if (!resolved.community) {
|
||||||
|
throw "Missing beta community";
|
||||||
|
}
|
||||||
|
|
||||||
|
await followCommunity(delta, true, resolved.community.community.id);
|
||||||
|
followed = (
|
||||||
|
await waitUntil(
|
||||||
|
() => resolveBetaCommunity(delta),
|
||||||
|
c => c.community?.subscribed === "Subscribed",
|
||||||
|
)
|
||||||
|
).community;
|
||||||
|
|
||||||
|
// Make sure there are 3 subscribers
|
||||||
|
expect(followed?.counts?.subscribers).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
test("Dont receive community activities after unsubscribe", async () => {
|
test("Dont receive community activities after unsubscribe", async () => {
|
||||||
let communityRes = await createCommunity(alpha);
|
let communityRes = await createCommunity(alpha);
|
||||||
expect(communityRes.community_view.community.name).toBeDefined();
|
expect(communityRes.community_view.community.name).toBeDefined();
|
||||||
|
@ -402,7 +458,7 @@ test("Dont receive community activities after unsubscribe", async () => {
|
||||||
let editSiteForm: EditSite = {};
|
let editSiteForm: EditSite = {};
|
||||||
editSiteForm.allowed_instances = ["lemmy-epsilon"];
|
editSiteForm.allowed_instances = ["lemmy-epsilon"];
|
||||||
await beta.editSite(editSiteForm);
|
await beta.editSite(editSiteForm);
|
||||||
await delay(2000);
|
await longDelay();
|
||||||
|
|
||||||
// unfollow
|
// unfollow
|
||||||
await followCommunity(beta, false, betaCommunity!.community.id);
|
await followCommunity(beta, false, betaCommunity!.community.id);
|
||||||
|
@ -417,7 +473,7 @@ test("Dont receive community activities after unsubscribe", async () => {
|
||||||
// unblock alpha
|
// unblock alpha
|
||||||
editSiteForm.allowed_instances = betaAllowedInstances;
|
editSiteForm.allowed_instances = betaAllowedInstances;
|
||||||
await beta.editSite(editSiteForm);
|
await beta.editSite(editSiteForm);
|
||||||
await delay(2000);
|
await longDelay();
|
||||||
|
|
||||||
// create a post, it shouldnt reach beta
|
// create a post, it shouldnt reach beta
|
||||||
let postRes = await createPost(
|
let postRes = await createPost(
|
||||||
|
@ -425,7 +481,7 @@ test("Dont receive community activities after unsubscribe", async () => {
|
||||||
communityRes.community_view.community.id,
|
communityRes.community_view.community.id,
|
||||||
);
|
);
|
||||||
expect(postRes.post_view.post.id).toBeDefined();
|
expect(postRes.post_view.post.id).toBeDefined();
|
||||||
await delay(2000);
|
// await longDelay();
|
||||||
|
|
||||||
let postResBeta = searchPostLocal(beta, postRes.post_view.post);
|
let postResBeta = searchPostLocal(beta, postRes.post_view.post);
|
||||||
expect((await postResBeta).posts.length).toBe(0);
|
expect((await postResBeta).posts.length).toBe(0);
|
||||||
|
|
|
@ -375,7 +375,7 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// wait for background task to finish
|
// wait for background task to finish
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(1000)).await;
|
||||||
|
|
||||||
let import_user_updated = LocalUserView::read(&mut context.pool(), import_user.local_user.id)
|
let import_user_updated = LocalUserView::read(&mut context.pool(), import_user.local_user.id)
|
||||||
.await
|
.await
|
||||||
|
|
66
crates/apub/src/collections/community_follower.rs
Normal file
66
crates/apub/src/collections/community_follower.rs
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
use crate::{
|
||||||
|
objects::community::ApubCommunity,
|
||||||
|
protocol::collections::group_followers::GroupFollowers,
|
||||||
|
};
|
||||||
|
use activitypub_federation::{
|
||||||
|
config::Data,
|
||||||
|
kinds::collection::CollectionType,
|
||||||
|
protocol::verification::verify_domains_match,
|
||||||
|
traits::Collection,
|
||||||
|
};
|
||||||
|
use lemmy_api_common::{context::LemmyContext, utils::generate_followers_url};
|
||||||
|
use lemmy_db_schema::aggregates::structs::CommunityAggregates;
|
||||||
|
use lemmy_db_views_actor::structs::CommunityFollowerView;
|
||||||
|
use lemmy_utils::error::LemmyError;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub(crate) struct ApubCommunityFollower(Vec<()>);
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Collection for ApubCommunityFollower {
|
||||||
|
type Owner = ApubCommunity;
|
||||||
|
type DataType = LemmyContext;
|
||||||
|
type Kind = GroupFollowers;
|
||||||
|
type Error = LemmyError;
|
||||||
|
|
||||||
|
async fn read_local(
|
||||||
|
community: &Self::Owner,
|
||||||
|
context: &Data<Self::DataType>,
|
||||||
|
) -> Result<Self::Kind, Self::Error> {
|
||||||
|
let community_id = community.id;
|
||||||
|
let community_followers =
|
||||||
|
CommunityFollowerView::count_community_followers(&mut context.pool(), community_id).await?;
|
||||||
|
|
||||||
|
Ok(GroupFollowers {
|
||||||
|
id: generate_followers_url(&community.actor_id)?.into(),
|
||||||
|
r#type: CollectionType::Collection,
|
||||||
|
total_items: community_followers as i32,
|
||||||
|
items: vec![],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn verify(
|
||||||
|
json: &Self::Kind,
|
||||||
|
expected_domain: &Url,
|
||||||
|
_data: &Data<Self::DataType>,
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
verify_domains_match(expected_domain, &json.id)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn from_json(
|
||||||
|
json: Self::Kind,
|
||||||
|
community: &Self::Owner,
|
||||||
|
context: &Data<Self::DataType>,
|
||||||
|
) -> Result<Self, Self::Error> {
|
||||||
|
CommunityAggregates::update_federated_followers(
|
||||||
|
&mut context.pool(),
|
||||||
|
community.id,
|
||||||
|
json.total_items,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(ApubCommunityFollower(Vec::new()))
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
pub(crate) mod community_featured;
|
pub(crate) mod community_featured;
|
||||||
|
pub(crate) mod community_follower;
|
||||||
pub(crate) mod community_moderators;
|
pub(crate) mod community_moderators;
|
||||||
pub(crate) mod community_outbox;
|
pub(crate) mod community_outbox;
|
||||||
|
|
|
@ -2,12 +2,12 @@ use crate::{
|
||||||
activity_lists::GroupInboxActivities,
|
activity_lists::GroupInboxActivities,
|
||||||
collections::{
|
collections::{
|
||||||
community_featured::ApubCommunityFeatured,
|
community_featured::ApubCommunityFeatured,
|
||||||
|
community_follower::ApubCommunityFollower,
|
||||||
community_moderators::ApubCommunityModerators,
|
community_moderators::ApubCommunityModerators,
|
||||||
community_outbox::ApubCommunityOutbox,
|
community_outbox::ApubCommunityOutbox,
|
||||||
},
|
},
|
||||||
http::{create_apub_response, create_apub_tombstone_response},
|
http::{create_apub_response, create_apub_tombstone_response},
|
||||||
objects::{community::ApubCommunity, person::ApubPerson},
|
objects::{community::ApubCommunity, person::ApubPerson},
|
||||||
protocol::collections::group_followers::GroupFollowers,
|
|
||||||
};
|
};
|
||||||
use activitypub_federation::{
|
use activitypub_federation::{
|
||||||
actix_web::inbox::receive_activity,
|
actix_web::inbox::receive_activity,
|
||||||
|
@ -66,7 +66,7 @@ pub(crate) async fn get_apub_community_followers(
|
||||||
) -> Result<HttpResponse, LemmyError> {
|
) -> Result<HttpResponse, LemmyError> {
|
||||||
let community =
|
let community =
|
||||||
Community::read_from_name(&mut context.pool(), &info.community_name, false).await?;
|
Community::read_from_name(&mut context.pool(), &info.community_name, false).await?;
|
||||||
let followers = GroupFollowers::new(community, &context).await?;
|
let followers = ApubCommunityFollower::read_local(&community.into(), &context).await?;
|
||||||
create_apub_response(&followers)
|
create_apub_response(&followers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -146,15 +146,19 @@ impl Object for ApubCommunity {
|
||||||
// Fetching mods and outbox is not necessary for Lemmy to work, so ignore errors. Besides,
|
// Fetching mods and outbox is not necessary for Lemmy to work, so ignore errors. Besides,
|
||||||
// we need to ignore these errors so that tests can work entirely offline.
|
// we need to ignore these errors so that tests can work entirely offline.
|
||||||
let fetch_outbox = group.outbox.dereference(&community, context);
|
let fetch_outbox = group.outbox.dereference(&community, context);
|
||||||
|
let fetch_followers = group.followers.dereference(&community, context);
|
||||||
|
|
||||||
if let Some(moderators) = group.attributed_to {
|
if let Some(moderators) = group.attributed_to {
|
||||||
let fetch_moderators = moderators.dereference(&community, context);
|
let fetch_moderators = moderators.dereference(&community, context);
|
||||||
// Fetch mods and outbox in parallel
|
// Fetch mods, outbox and followers in parallel
|
||||||
let res = tokio::join!(fetch_outbox, fetch_moderators);
|
let res = tokio::join!(fetch_outbox, fetch_moderators, fetch_followers);
|
||||||
res.0.map_err(|e| debug!("{}", e)).ok();
|
res.0.map_err(|e| debug!("{}", e)).ok();
|
||||||
res.1.map_err(|e| debug!("{}", e)).ok();
|
res.1.map_err(|e| debug!("{}", e)).ok();
|
||||||
|
res.2.map_err(|e| debug!("{}", e)).ok();
|
||||||
} else {
|
} else {
|
||||||
fetch_outbox.await.map_err(|e| debug!("{}", e)).ok();
|
let res = tokio::join!(fetch_outbox, fetch_followers);
|
||||||
|
res.0.map_err(|e| debug!("{}", e)).ok();
|
||||||
|
res.1.map_err(|e| debug!("{}", e)).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(community)
|
Ok(community)
|
||||||
|
@ -235,12 +239,14 @@ pub(crate) mod tests {
|
||||||
json.attributed_to = None;
|
json.attributed_to = None;
|
||||||
json.outbox =
|
json.outbox =
|
||||||
CollectionId::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap();
|
CollectionId::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap();
|
||||||
|
json.followers =
|
||||||
|
CollectionId::parse("https://enterprise.lemmy.ml/c/tenforward/not_followers").unwrap();
|
||||||
|
|
||||||
let url = Url::parse("https://enterprise.lemmy.ml/c/tenforward").unwrap();
|
let url = Url::parse("https://enterprise.lemmy.ml/c/tenforward").unwrap();
|
||||||
ApubCommunity::verify(&json, &url, &context2).await.unwrap();
|
ApubCommunity::verify(&json, &url, &context2).await.unwrap();
|
||||||
let community = ApubCommunity::from_json(json, &context2).await.unwrap();
|
let community = ApubCommunity::from_json(json, &context2).await.unwrap();
|
||||||
// this makes one requests to the (intentionally broken) outbox collection
|
// this makes requests to the (intentionally broken) outbox and followers collections
|
||||||
assert_eq!(context2.request_count(), 1);
|
assert_eq!(context2.request_count(), 2);
|
||||||
community
|
community
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,34 +1,12 @@
|
||||||
use activitypub_federation::kinds::collection::CollectionType;
|
use activitypub_federation::kinds::collection::CollectionType;
|
||||||
use lemmy_api_common::{context::LemmyContext, utils::generate_followers_url};
|
|
||||||
use lemmy_db_schema::source::community::Community;
|
|
||||||
use lemmy_db_views_actor::structs::CommunityFollowerView;
|
|
||||||
use lemmy_utils::error::LemmyError;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub(crate) struct GroupFollowers {
|
pub(crate) struct GroupFollowers {
|
||||||
id: Url,
|
pub(crate) id: Url,
|
||||||
r#type: CollectionType,
|
pub(crate) r#type: CollectionType,
|
||||||
total_items: i32,
|
pub(crate) total_items: i32,
|
||||||
items: Vec<()>,
|
pub(crate) items: Vec<()>,
|
||||||
}
|
|
||||||
|
|
||||||
impl GroupFollowers {
|
|
||||||
pub(crate) async fn new(
|
|
||||||
community: Community,
|
|
||||||
context: &LemmyContext,
|
|
||||||
) -> Result<GroupFollowers, LemmyError> {
|
|
||||||
let community_id = community.id;
|
|
||||||
let community_followers =
|
|
||||||
CommunityFollowerView::count_community_followers(&mut context.pool(), community_id).await?;
|
|
||||||
|
|
||||||
Ok(GroupFollowers {
|
|
||||||
id: generate_followers_url(&community.actor_id)?.into(),
|
|
||||||
r#type: CollectionType::Collection,
|
|
||||||
total_items: community_followers as i32,
|
|
||||||
items: vec![],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ use crate::{
|
||||||
check_apub_id_valid_with_strictness,
|
check_apub_id_valid_with_strictness,
|
||||||
collections::{
|
collections::{
|
||||||
community_featured::ApubCommunityFeatured,
|
community_featured::ApubCommunityFeatured,
|
||||||
|
community_follower::ApubCommunityFollower,
|
||||||
community_moderators::ApubCommunityModerators,
|
community_moderators::ApubCommunityModerators,
|
||||||
community_outbox::ApubCommunityOutbox,
|
community_outbox::ApubCommunityOutbox,
|
||||||
},
|
},
|
||||||
|
@ -48,7 +49,7 @@ pub struct Group {
|
||||||
/// username, set at account creation and usually fixed after that
|
/// username, set at account creation and usually fixed after that
|
||||||
pub(crate) preferred_username: String,
|
pub(crate) preferred_username: String,
|
||||||
pub(crate) inbox: Url,
|
pub(crate) inbox: Url,
|
||||||
pub(crate) followers: Url,
|
pub(crate) followers: CollectionId<ApubCommunityFollower>,
|
||||||
pub(crate) public_key: PublicKey,
|
pub(crate) public_key: PublicKey,
|
||||||
|
|
||||||
/// title
|
/// title
|
||||||
|
|
|
@ -1,20 +1,36 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
aggregates::structs::CommunityAggregates,
|
aggregates::structs::CommunityAggregates,
|
||||||
newtypes::CommunityId,
|
newtypes::CommunityId,
|
||||||
schema::community_aggregates,
|
schema::{
|
||||||
|
community_aggregates,
|
||||||
|
community_aggregates::{community_id, subscribers},
|
||||||
|
},
|
||||||
utils::{get_conn, DbPool},
|
utils::{get_conn, DbPool},
|
||||||
};
|
};
|
||||||
use diesel::{result::Error, ExpressionMethods, QueryDsl};
|
use diesel::{result::Error, ExpressionMethods, QueryDsl};
|
||||||
use diesel_async::RunQueryDsl;
|
use diesel_async::RunQueryDsl;
|
||||||
|
|
||||||
impl CommunityAggregates {
|
impl CommunityAggregates {
|
||||||
pub async fn read(pool: &mut DbPool<'_>, community_id: CommunityId) -> Result<Self, Error> {
|
pub async fn read(pool: &mut DbPool<'_>, for_community_id: CommunityId) -> Result<Self, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
community_aggregates::table
|
community_aggregates::table
|
||||||
.filter(community_aggregates::community_id.eq(community_id))
|
.filter(community_id.eq(for_community_id))
|
||||||
.first::<Self>(conn)
|
.first::<Self>(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_federated_followers(
|
||||||
|
pool: &mut DbPool<'_>,
|
||||||
|
for_community_id: CommunityId,
|
||||||
|
new_subscribers: i32,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let new_subscribers: i64 = new_subscribers.into();
|
||||||
|
diesel::update(community_aggregates::table.filter(community_id.eq(for_community_id)))
|
||||||
|
.set(subscribers.eq(new_subscribers))
|
||||||
|
.get_result::<Self>(conn)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
CREATE OR REPLACE FUNCTION community_aggregates_subscriber_count ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
IF (TG_OP = 'INSERT') THEN
|
||||||
|
UPDATE
|
||||||
|
community_aggregates
|
||||||
|
SET
|
||||||
|
subscribers = subscribers + 1
|
||||||
|
WHERE
|
||||||
|
community_id = NEW.community_id;
|
||||||
|
ELSIF (TG_OP = 'DELETE') THEN
|
||||||
|
UPDATE
|
||||||
|
community_aggregates
|
||||||
|
SET
|
||||||
|
subscribers = subscribers - 1
|
||||||
|
WHERE
|
||||||
|
community_id = OLD.community_id;
|
||||||
|
END IF;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
-- The subscriber count should only be updated for local communities. For remote
|
||||||
|
-- communities it is read over federation from the origin instance.
|
||||||
|
CREATE OR REPLACE FUNCTION community_aggregates_subscriber_count ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
IF (TG_OP = 'INSERT') THEN
|
||||||
|
UPDATE
|
||||||
|
community_aggregates
|
||||||
|
SET
|
||||||
|
subscribers = subscribers + 1
|
||||||
|
FROM
|
||||||
|
community
|
||||||
|
WHERE
|
||||||
|
community.id = community_id
|
||||||
|
AND community.local
|
||||||
|
AND community_id = NEW.community_id;
|
||||||
|
ELSIF (TG_OP = 'DELETE') THEN
|
||||||
|
UPDATE
|
||||||
|
community_aggregates
|
||||||
|
SET
|
||||||
|
subscribers = subscribers - 1
|
||||||
|
FROM
|
||||||
|
community
|
||||||
|
WHERE
|
||||||
|
community.id = community_id
|
||||||
|
AND community.local
|
||||||
|
AND community_id = OLD.community_id;
|
||||||
|
END IF;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
Loading…
Reference in a new issue