Merge remote-tracking branch 'yerba/outbox-activities'

This commit is contained in:
Dessalines 2021-01-29 09:17:14 -05:00
commit 62a145d8b3
6 changed files with 79 additions and 40 deletions

12
Cargo.lock generated
View File

@ -1023,6 +1023,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "diesel_json"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2812f0f63b6d3508fb7bfdb872c2dc2321ba938f5e0f4cb9751ec899e8b297c9"
dependencies = [
"diesel",
"serde 1.0.118",
"serde_json",
]
[[package]] [[package]]
name = "diesel_migrations" name = "diesel_migrations"
version = "1.4.0" version = "1.4.0"
@ -1794,6 +1805,7 @@ dependencies = [
"bcrypt", "bcrypt",
"chrono", "chrono",
"diesel", "diesel",
"diesel_json",
"diesel_migrations", "diesel_migrations",
"lazy_static", "lazy_static",
"lemmy_db_schema", "lemmy_db_schema",

View File

@ -1,28 +1,23 @@
use crate::{ use crate::{
check_is_apub_id_valid,
fetcher::{ fetcher::{
fetch::fetch_remote_object, fetch::fetch_remote_object,
get_or_fetch_and_upsert_user, get_or_fetch_and_upsert_user,
is_deleted, is_deleted,
should_refetch_actor, should_refetch_actor,
}, },
inbox::user_inbox::receive_announce,
objects::FromApub, objects::FromApub,
ActorType, ActorType,
GroupExt, GroupExt,
PageExt,
}; };
use activitystreams::{ use activitystreams::{
base::{BaseExt, ExtendsExt},
collection::{CollectionExt, OrderedCollection}, collection::{CollectionExt, OrderedCollection},
object::ObjectExt, object::ObjectExt,
}; };
use anyhow::Context; use anyhow::Context;
use diesel::result::Error::NotFound; use diesel::result::Error::NotFound;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable}; use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
use lemmy_db_schema::source::{ use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
community::{Community, CommunityModerator, CommunityModeratorForm},
post::Post,
};
use lemmy_structs::blocking; use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError}; use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
@ -119,29 +114,34 @@ async fn fetch_remote_community(
.await??; .await??;
} }
// fetch outbox (maybe make this conditional) // only fetch outbox for new communities, otherwise this can create an infinite loop
if old_community.is_none() {
fetch_community_outbox(context, &community, recursion_counter).await?
}
Ok(community)
}
async fn fetch_community_outbox(
context: &LemmyContext,
community: &Community,
recursion_counter: &mut i32,
) -> Result<(), LemmyError> {
let outbox = fetch_remote_object::<OrderedCollection>( let outbox = fetch_remote_object::<OrderedCollection>(
context.client(), context.client(),
&community.get_outbox_url()?, &community.get_outbox_url()?,
recursion_counter, recursion_counter,
) )
.await?; .await?;
let outbox_items = outbox.items().context(location_info!())?.clone(); let outbox_activities = outbox.items().context(location_info!())?.clone();
let mut outbox_items = outbox_items.many().context(location_info!())?; let mut outbox_activities = outbox_activities.many().context(location_info!())?;
if outbox_items.len() > 20 { if outbox_activities.len() > 20 {
outbox_items = outbox_items[0..20].to_vec(); outbox_activities = outbox_activities[0..20].to_vec();
}
for o in outbox_items {
let page = PageExt::from_any_base(o)?.context(location_info!())?;
let page_id = page.id_unchecked().context(location_info!())?;
// The post creator may be from a blocked instance, if it errors, then skip it
if check_is_apub_id_valid(page_id).is_err() {
continue;
}
Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
// TODO: we need to send a websocket update here
} }
Ok(community) for activity in outbox_activities {
receive_announce(context, activity, community, recursion_counter).await?;
}
Ok(())
} }

View File

@ -5,12 +5,12 @@ use crate::{
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
base::{AnyBase, BaseExt, ExtendsExt}, base::{AnyBase, BaseExt},
collection::{CollectionExt, OrderedCollection, UnorderedCollection}, collection::{CollectionExt, OrderedCollection, UnorderedCollection},
}; };
use actix_web::{body::Body, web, HttpResponse}; use actix_web::{body::Body, web, HttpResponse};
use lemmy_db_queries::source::{community::Community_, post::Post_}; use lemmy_db_queries::source::{activity::Activity_, community::Community_};
use lemmy_db_schema::source::{community::Community, post::Post}; use lemmy_db_schema::source::{activity::Activity, community::Community};
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView; use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
use lemmy_structs::blocking; use lemmy_structs::blocking;
use lemmy_utils::LemmyError; use lemmy_utils::LemmyError;
@ -76,21 +76,20 @@ pub async fn get_apub_community_outbox(
}) })
.await??; .await??;
let community_id = community.id; let community_actor_id = community.actor_id.to_owned();
let posts = blocking(context.pool(), move |conn| { let activities = blocking(context.pool(), move |conn| {
Post::list_for_community(conn, community_id) Activity::read_community_outbox(conn, &community_actor_id)
}) })
.await??; .await??;
let mut pages: Vec<AnyBase> = vec![]; let activities = activities
for p in posts { .iter()
pages.push(p.to_apub(context.pool()).await?.into_any_base()?); .map(AnyBase::from_arbitrary_json)
} .collect::<Result<Vec<AnyBase>, serde_json::Error>>()?;
let len = activities.len();
let len = pages.len();
let mut collection = OrderedCollection::new(); let mut collection = OrderedCollection::new();
collection collection
.set_many_items(pages) .set_many_items(activities)
.set_many_contexts(lemmy_context()?) .set_many_contexts(lemmy_context()?)
.set_id(community.get_outbox_url()?) .set_id(community.get_outbox_url()?)
.set_total_items(len as u64); .set_total_items(len as u64);

View File

@ -236,7 +236,7 @@ async fn receive_accept(
} }
/// Takes an announce and passes the inner activity to the appropriate handler. /// Takes an announce and passes the inner activity to the appropriate handler.
async fn receive_announce( pub async fn receive_announce(
context: &LemmyContext, context: &LemmyContext,
activity: AnyBase, activity: AnyBase,
actor: &dyn ActorType, actor: &dyn ActorType,

View File

@ -23,3 +23,4 @@ url = { version = "2.2.0", features = ["serde"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
regex = "1.4.2" regex = "1.4.2"
bcrypt = "0.9.0" bcrypt = "0.9.0"
diesel_json = "0.1.1"

View File

@ -1,8 +1,9 @@
use crate::Crud; use crate::Crud;
use diesel::{dsl::*, result::Error, *}; use diesel::{dsl::*, result::Error, sql_types::Text, *};
use lemmy_db_schema::source::activity::*; use lemmy_db_schema::{source::activity::*, Url};
use log::debug; use log::debug;
use serde::Serialize; use serde::Serialize;
use serde_json::Value;
use std::{ use std::{
fmt::Debug, fmt::Debug,
io::{Error as IoError, ErrorKind}, io::{Error as IoError, ErrorKind},
@ -47,7 +48,14 @@ pub trait Activity_ {
) -> Result<Activity, IoError> ) -> Result<Activity, IoError>
where where
T: Serialize + Debug; T: Serialize + Debug;
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>; fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
/// Returns up to 20 activities of type `Announce/Create/Page` from the community
fn read_community_outbox(
conn: &PgConnection,
community_actor_id: &Url,
) -> Result<Vec<Value>, Error>;
} }
impl Activity_ for Activity { impl Activity_ for Activity {
@ -83,6 +91,25 @@ impl Activity_ for Activity {
use lemmy_db_schema::schema::activity::dsl::*; use lemmy_db_schema::schema::activity::dsl::*;
activity.filter(ap_id.eq(object_id)).first::<Self>(conn) activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
} }
fn read_community_outbox(
conn: &PgConnection,
community_actor_id: &Url,
) -> Result<Vec<Value>, Error> {
use lemmy_db_schema::schema::activity::dsl::*;
let res: Vec<Value> = activity
.select(data)
.filter(
sql("activity.data ->> 'type' = 'Announce'")
.sql(" AND activity.data -> 'object' ->> 'type' = 'Create'")
.sql(" AND activity.data -> 'object' -> 'object' ->> 'type' = 'Page'")
.sql(" AND activity.data ->> 'actor' = ")
.bind::<Text, _>(community_actor_id),
)
.limit(20)
.get_results(conn)?;
Ok(res)
}
} }
#[cfg(test)] #[cfg(test)]