Serve activities in community outbox (fixes #1216)
This commit is contained in:
parent
c51f750831
commit
c09c462a6e
6 changed files with 79 additions and 40 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
@ -1023,6 +1023,17 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "diesel_json"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2812f0f63b6d3508fb7bfdb872c2dc2321ba938f5e0f4cb9751ec899e8b297c9"
|
||||||
|
dependencies = [
|
||||||
|
"diesel",
|
||||||
|
"serde 1.0.118",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "diesel_migrations"
|
name = "diesel_migrations"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
@ -1794,6 +1805,7 @@ dependencies = [
|
||||||
"bcrypt",
|
"bcrypt",
|
||||||
"chrono",
|
"chrono",
|
||||||
"diesel",
|
"diesel",
|
||||||
|
"diesel_json",
|
||||||
"diesel_migrations",
|
"diesel_migrations",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"lemmy_db_schema",
|
"lemmy_db_schema",
|
||||||
|
|
|
@ -1,28 +1,23 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
check_is_apub_id_valid,
|
|
||||||
fetcher::{
|
fetcher::{
|
||||||
fetch::fetch_remote_object,
|
fetch::fetch_remote_object,
|
||||||
get_or_fetch_and_upsert_user,
|
get_or_fetch_and_upsert_user,
|
||||||
is_deleted,
|
is_deleted,
|
||||||
should_refetch_actor,
|
should_refetch_actor,
|
||||||
},
|
},
|
||||||
|
inbox::user_inbox::receive_announce,
|
||||||
objects::FromApub,
|
objects::FromApub,
|
||||||
ActorType,
|
ActorType,
|
||||||
GroupExt,
|
GroupExt,
|
||||||
PageExt,
|
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
base::{BaseExt, ExtendsExt},
|
|
||||||
collection::{CollectionExt, OrderedCollection},
|
collection::{CollectionExt, OrderedCollection},
|
||||||
object::ObjectExt,
|
object::ObjectExt,
|
||||||
};
|
};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use diesel::result::Error::NotFound;
|
use diesel::result::Error::NotFound;
|
||||||
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
|
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
|
||||||
use lemmy_db_schema::source::{
|
use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
|
||||||
community::{Community, CommunityModerator, CommunityModeratorForm},
|
|
||||||
post::Post,
|
|
||||||
};
|
|
||||||
use lemmy_structs::blocking;
|
use lemmy_structs::blocking;
|
||||||
use lemmy_utils::{location_info, LemmyError};
|
use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
|
@ -119,29 +114,34 @@ async fn fetch_remote_community(
|
||||||
.await??;
|
.await??;
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch outbox (maybe make this conditional)
|
// only fetch outbox for new communities, otherwise this can create an infinite loop
|
||||||
|
if old_community.is_none() {
|
||||||
|
fetch_community_outbox(context, &community, recursion_counter).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(community)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_community_outbox(
|
||||||
|
context: &LemmyContext,
|
||||||
|
community: &Community,
|
||||||
|
recursion_counter: &mut i32,
|
||||||
|
) -> Result<(), LemmyError> {
|
||||||
let outbox = fetch_remote_object::<OrderedCollection>(
|
let outbox = fetch_remote_object::<OrderedCollection>(
|
||||||
context.client(),
|
context.client(),
|
||||||
&community.get_outbox_url()?,
|
&community.get_outbox_url()?,
|
||||||
recursion_counter,
|
recursion_counter,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let outbox_items = outbox.items().context(location_info!())?.clone();
|
let outbox_activities = outbox.items().context(location_info!())?.clone();
|
||||||
let mut outbox_items = outbox_items.many().context(location_info!())?;
|
let mut outbox_activities = outbox_activities.many().context(location_info!())?;
|
||||||
if outbox_items.len() > 20 {
|
if outbox_activities.len() > 20 {
|
||||||
outbox_items = outbox_items[0..20].to_vec();
|
outbox_activities = outbox_activities[0..20].to_vec();
|
||||||
}
|
|
||||||
for o in outbox_items {
|
|
||||||
let page = PageExt::from_any_base(o)?.context(location_info!())?;
|
|
||||||
let page_id = page.id_unchecked().context(location_info!())?;
|
|
||||||
|
|
||||||
// The post creator may be from a blocked instance, if it errors, then skip it
|
|
||||||
if check_is_apub_id_valid(page_id).is_err() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
|
|
||||||
// TODO: we need to send a websocket update here
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(community)
|
for activity in outbox_activities {
|
||||||
|
receive_announce(context, activity, community, recursion_counter).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,12 +5,12 @@ use crate::{
|
||||||
ActorType,
|
ActorType,
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
base::{AnyBase, BaseExt, ExtendsExt},
|
base::{AnyBase, BaseExt},
|
||||||
collection::{CollectionExt, OrderedCollection, UnorderedCollection},
|
collection::{CollectionExt, OrderedCollection, UnorderedCollection},
|
||||||
};
|
};
|
||||||
use actix_web::{body::Body, web, HttpResponse};
|
use actix_web::{body::Body, web, HttpResponse};
|
||||||
use lemmy_db_queries::source::{community::Community_, post::Post_};
|
use lemmy_db_queries::source::{activity::Activity_, community::Community_};
|
||||||
use lemmy_db_schema::source::{community::Community, post::Post};
|
use lemmy_db_schema::source::{activity::Activity, community::Community};
|
||||||
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
|
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
|
||||||
use lemmy_structs::blocking;
|
use lemmy_structs::blocking;
|
||||||
use lemmy_utils::LemmyError;
|
use lemmy_utils::LemmyError;
|
||||||
|
@ -76,21 +76,20 @@ pub async fn get_apub_community_outbox(
|
||||||
})
|
})
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
let community_id = community.id;
|
let community_actor_id = community.actor_id.to_owned();
|
||||||
let posts = blocking(context.pool(), move |conn| {
|
let activities = blocking(context.pool(), move |conn| {
|
||||||
Post::list_for_community(conn, community_id)
|
Activity::read_community_outbox(conn, &community_actor_id)
|
||||||
})
|
})
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
let mut pages: Vec<AnyBase> = vec![];
|
let activities = activities
|
||||||
for p in posts {
|
.iter()
|
||||||
pages.push(p.to_apub(context.pool()).await?.into_any_base()?);
|
.map(AnyBase::from_arbitrary_json)
|
||||||
}
|
.collect::<Result<Vec<AnyBase>, serde_json::Error>>()?;
|
||||||
|
let len = activities.len();
|
||||||
let len = pages.len();
|
|
||||||
let mut collection = OrderedCollection::new();
|
let mut collection = OrderedCollection::new();
|
||||||
collection
|
collection
|
||||||
.set_many_items(pages)
|
.set_many_items(activities)
|
||||||
.set_many_contexts(lemmy_context()?)
|
.set_many_contexts(lemmy_context()?)
|
||||||
.set_id(community.get_outbox_url()?)
|
.set_id(community.get_outbox_url()?)
|
||||||
.set_total_items(len as u64);
|
.set_total_items(len as u64);
|
||||||
|
|
|
@ -236,7 +236,7 @@ async fn receive_accept(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Takes an announce and passes the inner activity to the appropriate handler.
|
/// Takes an announce and passes the inner activity to the appropriate handler.
|
||||||
async fn receive_announce(
|
pub async fn receive_announce(
|
||||||
context: &LemmyContext,
|
context: &LemmyContext,
|
||||||
activity: AnyBase,
|
activity: AnyBase,
|
||||||
actor: &dyn ActorType,
|
actor: &dyn ActorType,
|
||||||
|
|
|
@ -23,3 +23,4 @@ url = { version = "2.2.0", features = ["serde"] }
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
regex = "1.4.2"
|
regex = "1.4.2"
|
||||||
bcrypt = "0.9.0"
|
bcrypt = "0.9.0"
|
||||||
|
diesel_json = "0.1.1"
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
use crate::Crud;
|
use crate::Crud;
|
||||||
use diesel::{dsl::*, result::Error, *};
|
use diesel::{dsl::*, result::Error, sql_types::Text, *};
|
||||||
use lemmy_db_schema::source::activity::*;
|
use lemmy_db_schema::{source::activity::*, Url};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
use serde_json::Value;
|
||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
io::{Error as IoError, ErrorKind},
|
io::{Error as IoError, ErrorKind},
|
||||||
|
@ -47,7 +48,14 @@ pub trait Activity_ {
|
||||||
) -> Result<Activity, IoError>
|
) -> Result<Activity, IoError>
|
||||||
where
|
where
|
||||||
T: Serialize + Debug;
|
T: Serialize + Debug;
|
||||||
|
|
||||||
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
|
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
|
||||||
|
|
||||||
|
/// Returns up to 20 activities of type `Announce/Create/Page` from the community
|
||||||
|
fn read_community_outbox(
|
||||||
|
conn: &PgConnection,
|
||||||
|
community_actor_id: &Url,
|
||||||
|
) -> Result<Vec<Value>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Activity_ for Activity {
|
impl Activity_ for Activity {
|
||||||
|
@ -83,6 +91,25 @@ impl Activity_ for Activity {
|
||||||
use lemmy_db_schema::schema::activity::dsl::*;
|
use lemmy_db_schema::schema::activity::dsl::*;
|
||||||
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
|
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn read_community_outbox(
|
||||||
|
conn: &PgConnection,
|
||||||
|
community_actor_id: &Url,
|
||||||
|
) -> Result<Vec<Value>, Error> {
|
||||||
|
use lemmy_db_schema::schema::activity::dsl::*;
|
||||||
|
let res: Vec<Value> = activity
|
||||||
|
.select(data)
|
||||||
|
.filter(
|
||||||
|
sql("activity.data ->> 'type' = 'Announce'")
|
||||||
|
.sql(" AND activity.data -> 'object' ->> 'type' = 'Create'")
|
||||||
|
.sql(" AND activity.data -> 'object' -> 'object' ->> 'type' = 'Page'")
|
||||||
|
.sql(" AND activity.data ->> 'actor' = ")
|
||||||
|
.bind::<Text, _>(community_actor_id),
|
||||||
|
)
|
||||||
|
.limit(20)
|
||||||
|
.get_results(conn)?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in a new issue