WIP: try to fix fetch_community_outbox (fixes #1582)

This commit is contained in:
Felix Ableitner 2021-04-20 17:07:28 +02:00
parent 65a11a7239
commit 2c745e6cd2
6 changed files with 58 additions and 13 deletions

View file

@ -3,7 +3,11 @@ use crate::{
activity_queue::{send_activity_single_dest, send_to_community, send_to_community_followers}, activity_queue::{send_activity_single_dest, send_to_community, send_to_community_followers},
check_is_apub_id_valid, check_is_apub_id_valid,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{get_or_fetch_and_upsert_actor, person::get_or_fetch_and_upsert_person}, fetcher::{
community::ReceiveAnnounceFunction,
get_or_fetch_and_upsert_actor,
person::get_or_fetch_and_upsert_person,
},
generate_moderators_url, generate_moderators_url,
insert_activity, insert_activity,
objects::ToApub, objects::ToApub,
@ -244,6 +248,7 @@ impl CommunityType for Community {
activity: AnyBase, activity: AnyBase,
object_actor: Option<Url>, object_actor: Option<Url>,
context: &LemmyContext, context: &LemmyContext,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let inner_id = activity.id().context(location_info!())?; let inner_id = activity.id().context(location_info!())?;
if inner_id.domain() == Some(&Settings::get().get_hostname_without_port()?) { if inner_id.domain() == Some(&Settings::get().get_hostname_without_port()?) {
@ -255,7 +260,7 @@ impl CommunityType for Community {
if let Some(actor_id) = object_actor { if let Some(actor_id) = object_actor {
// Ignore errors, maybe its not actually an actor // Ignore errors, maybe its not actually an actor
// TODO: should pass the actual request counter in, but that seems complicated // TODO: should pass the actual request counter in, but that seems complicated
let actor = get_or_fetch_and_upsert_actor(&actor_id, context, &mut 0) let actor = get_or_fetch_and_upsert_actor(&actor_id, context, &mut 0, receive_announce)
.await .await
.ok(); .ok();
if let Some(actor) = actor { if let Some(actor) = actor {

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
check_is_apub_id_valid, check_is_apub_id_valid,
extensions::signatures::sign_and_send, extensions::signatures::sign_and_send,
fetcher::community::ReceiveAnnounceFunction,
insert_activity, insert_activity,
ActorType, ActorType,
CommunityType, CommunityType,
@ -111,6 +112,7 @@ pub(crate) async fn send_to_community<T, Kind>(
community: &Community, community: &Community,
object_actor: Option<Url>, object_actor: Option<Url>,
context: &LemmyContext, context: &LemmyContext,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
@ -120,7 +122,12 @@ where
// if this is a local community, we need to do an announce from the community instead // if this is a local community, we need to do an announce from the community instead
if community.local { if community.local {
community community
.send_announce(activity.into_any_base()?, object_actor, context) .send_announce(
activity.into_any_base()?,
object_actor,
context,
receive_announce,
)
.await?; .await?;
} else { } else {
let inbox = community.get_shared_inbox_or_inbox_url(); let inbox = community.get_shared_inbox_or_inbox_url();

View file

@ -6,10 +6,12 @@ use crate::{
should_refetch_actor, should_refetch_actor,
}, },
objects::FromApub, objects::FromApub,
ActorType,
GroupExt, GroupExt,
}; };
use activitystreams::{ use activitystreams::{
actor::ApActorExt, actor::ApActorExt,
base::AnyBase,
collection::{CollectionExt, OrderedCollection}, collection::{CollectionExt, OrderedCollection},
}; };
use anyhow::Context; use anyhow::Context;
@ -24,8 +26,17 @@ use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
use lemmy_utils::{location_info, LemmyError}; use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
use log::debug; use log::debug;
use std::{future::Future, pin::Pin};
use url::Url; use url::Url;
pub(crate) type ReceiveAnnounceFunction<'a> =
fn(
&LemmyContext,
AnyBase,
&dyn ActorType,
&mut i32,
) -> Pin<Box<dyn Future<Output = Result<(), LemmyError>> + 'a>>;
/// Get a community from its apub ID. /// Get a community from its apub ID.
/// ///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
@ -34,6 +45,7 @@ pub async fn get_or_fetch_and_upsert_community(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32, recursion_counter: &mut i32,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<Community, LemmyError> { ) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned(); let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| { let community = blocking(context.pool(), move |conn| {
@ -44,12 +56,19 @@ pub async fn get_or_fetch_and_upsert_community(
match community { match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id); debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c), recursion_counter).await fetch_remote_community(
apub_id,
context,
Some(c),
recursion_counter,
receive_announce,
)
.await
} }
Ok(c) => Ok(c), Ok(c) => Ok(c),
Err(NotFound {}) => { Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id); debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None, recursion_counter).await fetch_remote_community(apub_id, context, None, recursion_counter, receive_announce).await
} }
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
@ -63,6 +82,7 @@ async fn fetch_remote_community(
context: &LemmyContext, context: &LemmyContext,
old_community: Option<Community>, old_community: Option<Community>,
request_counter: &mut i32, request_counter: &mut i32,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<Community, LemmyError> { ) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await; let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await;
@ -87,7 +107,14 @@ async fn fetch_remote_community(
// only fetch outbox for new communities, otherwise this can create an infinite loop // only fetch outbox for new communities, otherwise this can create an infinite loop
if old_community.is_none() { if old_community.is_none() {
let outbox = group.inner.outbox()?.context(location_info!())?; let outbox = group.inner.outbox()?.context(location_info!())?;
fetch_community_outbox(context, outbox, &community, request_counter).await? fetch_community_outbox(
context,
outbox,
&community,
request_counter,
receive_announce,
)
.await?
} }
Ok(community) Ok(community)
@ -147,6 +174,7 @@ async fn fetch_community_outbox(
outbox: &Url, outbox: &Url,
community: &Community, community: &Community,
recursion_counter: &mut i32, recursion_counter: &mut i32,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let outbox = let outbox =
fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?; fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?;
@ -157,8 +185,7 @@ async fn fetch_community_outbox(
} }
for activity in outbox_activities { for activity in outbox_activities {
todo!("{:?} {:?} {:?}", activity, community, recursion_counter); receive_announce(context, activity, community, recursion_counter).await?;
//receive_announce(context, activity, community, recursion_counter).await?;
} }
Ok(()) Ok(())

View file

@ -6,7 +6,7 @@ pub mod search;
use crate::{ use crate::{
fetcher::{ fetcher::{
community::get_or_fetch_and_upsert_community, community::{get_or_fetch_and_upsert_community, ReceiveAnnounceFunction},
fetch::FetchError, fetch::FetchError,
person::get_or_fetch_and_upsert_person, person::get_or_fetch_and_upsert_person,
}, },
@ -46,8 +46,10 @@ pub async fn get_or_fetch_and_upsert_actor(
apub_id: &Url, apub_id: &Url,
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32, recursion_counter: &mut i32,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<Box<dyn ActorType>, LemmyError> { ) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; let community =
get_or_fetch_and_upsert_community(apub_id, context, recursion_counter, receive_announce).await;
let actor: Box<dyn ActorType> = match community { let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c), Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?), Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?),

View file

@ -14,7 +14,7 @@ use crate::{
person_extension::PersonExtension, person_extension::PersonExtension,
signatures::{PublicKey, PublicKeyExtension}, signatures::{PublicKey, PublicKeyExtension},
}, },
fetcher::community::get_or_fetch_and_upsert_community, fetcher::community::{get_or_fetch_and_upsert_community, ReceiveAnnounceFunction},
}; };
use activitystreams::{ use activitystreams::{
activity::Follow, activity::Follow,
@ -217,6 +217,7 @@ pub trait CommunityType {
activity: AnyBase, activity: AnyBase,
object: Option<Url>, object: Option<Url>,
context: &LemmyContext, context: &LemmyContext,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<(), LemmyError>; ) -> Result<(), LemmyError>;
async fn send_add_mod( async fn send_add_mod(
@ -469,12 +470,14 @@ pub async fn get_community_from_to_or_cc<T, Kind>(
activity: &T, activity: &T,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
receive_announce: ReceiveAnnounceFunction<'_>,
) -> Result<Community, LemmyError> ) -> Result<Community, LemmyError>
where where
T: AsObject<Kind>, T: AsObject<Kind>,
{ {
for cid in get_activity_to_and_cc(activity) { for cid in get_activity_to_and_cc(activity) {
let community = get_or_fetch_and_upsert_community(&cid, context, request_counter).await; let community =
get_or_fetch_and_upsert_community(&cid, context, request_counter, receive_announce).await;
if community.is_ok() { if community.is_ok() {
return community; return community;
} }

View file

@ -254,7 +254,8 @@ async fn receive_accept(
.context(location_info!())?; .context(location_info!())?;
let community = let community =
get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?; get_or_fetch_and_upsert_community(&community_uri, context, request_counter, receive_announce)
.await?;
let community_id = community.id; let community_id = community.id;
let person_id = person.id; let person_id = person.id;