From 678e1fa9273d77360086701180b97a3f72602191 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Fri, 24 Jul 2020 17:07:33 +0200 Subject: [PATCH] Add community outbox (ref #1018) --- server/lemmy_db/src/post.rs | 3 + server/src/apub/community.rs | 36 ++++++++- server/src/apub/fetcher.rs | 131 ++++++++++++++++++++------------ server/src/apub/mod.rs | 8 +- server/src/apub/post.rs | 3 +- server/src/apub/user.rs | 4 +- server/src/routes/federation.rs | 9 +-- 7 files changed, 129 insertions(+), 65 deletions(-) diff --git a/server/lemmy_db/src/post.rs b/server/lemmy_db/src/post.rs index d466778973..5eb9a47230 100644 --- a/server/lemmy_db/src/post.rs +++ b/server/lemmy_db/src/post.rs @@ -76,6 +76,9 @@ impl Post { use crate::schema::post::dsl::*; post .filter(community_id.eq(the_community_id)) + .then_order_by(published.desc()) + .then_order_by(stickied.desc()) + .limit(20) .load::(conn) } diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index 112b6e8516..584ef310c0 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -31,7 +31,7 @@ use activitystreams_new::{ }, actor::{kind::GroupType, ApActor, Endpoints, Group}, base::{AnyBase, BaseExt}, - collection::UnorderedCollection, + collection::{OrderedCollection, UnorderedCollection}, context, object::Tombstone, prelude::*, @@ -43,6 +43,7 @@ use lemmy_db::{ community::{Community, CommunityForm}, community_view::{CommunityFollowerView, CommunityModeratorView}, naive_now, + post::Post, user::User_, }; use lemmy_utils::convert_datetime; @@ -88,10 +89,10 @@ impl ToApub for Community { group.set_content(d); } - let mut ap_actor = ApActor::new(self.get_inbox_url().parse()?, group); + let mut ap_actor = ApActor::new(self.get_inbox_url()?, group); ap_actor .set_preferred_username(self.title.to_owned()) - .set_outbox(self.get_outbox_url().parse()?) + .set_outbox(self.get_outbox_url()?) .set_followers(self.get_followers_url().parse()?) .set_following(self.get_following_url().parse()?) .set_liked(self.get_liked_url().parse()?) @@ -411,6 +412,35 @@ pub async fn get_apub_community_followers( Ok(create_apub_response(&collection)) } +pub async fn get_apub_community_outbox( + info: web::Path, + db: DbPoolParam, +) -> Result, LemmyError> { + let community = blocking(&db, move |conn| { + Community::read_from_name(&conn, &info.community_name) + }) + .await??; + + let community_id = community.id; + let posts = blocking(&db, move |conn| { + Post::list_for_community(conn, community_id) + }) + .await??; + + let mut pages: Vec = vec![]; + for p in posts { + pages.push(p.to_apub(&db).await?.into_any_base()?); + } + + let len = pages.len(); + let mut collection = OrderedCollection::new(pages); + collection + .set_context(context()) + .set_id(community.get_outbox_url()?) + .set_total_items(len as u64); + Ok(create_apub_response(&collection)) +} + pub async fn do_announce( activity: AnyBase, community: &Community, diff --git a/server/src/apub/fetcher.rs b/server/src/apub/fetcher.rs index c10426d14f..e2d505df7a 100644 --- a/server/src/apub/fetcher.rs +++ b/server/src/apub/fetcher.rs @@ -15,7 +15,7 @@ use crate::{ DbPool, LemmyError, }; -use activitystreams_new::{base::BaseExt, object::Note, prelude::*}; +use activitystreams_new::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*}; use actix_web::client::Client; use chrono::NaiveDateTime; use diesel::{result::Error::NotFound, PgConnection}; @@ -40,6 +40,7 @@ use std::{fmt::Debug, time::Duration}; use url::Url; static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; +static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; // Fetch nodeinfo metadata from a remote instance. async fn _fetch_node_info(client: &Client, domain: &str) -> Result { @@ -257,12 +258,13 @@ pub async fn get_or_fetch_and_upsert_user( /// TODO it won't pick up new avatars, summaries etc until a day after. /// Actors need an "update" activity pushed to other servers to fix this. fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { - if cfg!(debug_assertions) { - true + let update_interval = if cfg!(debug_assertions) { + // avoid infinite loop when fetching community outbox + chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) } else { - let update_interval = chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS); - last_refreshed.lt(&(naive_now() - update_interval)) - } + chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS) + }; + last_refreshed.lt(&(naive_now() - update_interval)) } /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community. @@ -280,59 +282,88 @@ pub async fn get_or_fetch_and_upsert_community( match community { Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { debug!("Fetching and updating from remote community: {}", apub_id); - let group = fetch_remote_object::(client, apub_id).await?; - - let mut cf = CommunityForm::from_apub(&group, client, pool).await?; - cf.last_refreshed_at = Some(naive_now()); - let community = blocking(pool, move |conn| Community::update(conn, c.id, &cf)).await??; - - Ok(community) + fetch_remote_community(apub_id, client, pool, Some(c.id)).await } Ok(c) => Ok(c), Err(NotFound {}) => { debug!("Fetching and creating remote community: {}", apub_id); - let group = fetch_remote_object::(client, apub_id).await?; - - let cf = CommunityForm::from_apub(&group, client, pool).await?; - let community = blocking(pool, move |conn| Community::create(conn, &cf)).await??; - - // Also add the community moderators too - let attributed_to = group.inner.attributed_to().unwrap(); - let creator_and_moderator_uris: Vec<&Url> = attributed_to - .as_many() - .unwrap() - .iter() - .map(|a| a.as_xsd_any_uri().unwrap()) - .collect(); - - let mut creator_and_moderators = Vec::new(); - - for uri in creator_and_moderator_uris { - let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?; - - creator_and_moderators.push(c_or_m); - } - - let community_id = community.id; - blocking(pool, move |conn| { - for mod_ in creator_and_moderators { - let community_moderator_form = CommunityModeratorForm { - community_id, - user_id: mod_.id, - }; - - CommunityModerator::join(conn, &community_moderator_form)?; - } - Ok(()) as Result<(), LemmyError> - }) - .await??; - - Ok(community) + fetch_remote_community(apub_id, client, pool, None).await } Err(e) => Err(e.into()), } } +async fn fetch_remote_community( + apub_id: &Url, + client: &Client, + pool: &DbPool, + community_id: Option, +) -> Result { + let group = fetch_remote_object::(client, apub_id).await?; + + let cf = CommunityForm::from_apub(&group, client, pool).await?; + let community = blocking(pool, move |conn| { + if let Some(cid) = community_id { + Community::update(conn, cid, &cf) + } else { + Community::create(conn, &cf) + } + }) + .await??; + + // Also add the community moderators too + let attributed_to = group.inner.attributed_to().unwrap(); + let creator_and_moderator_uris: Vec<&Url> = attributed_to + .as_many() + .unwrap() + .iter() + .map(|a| a.as_xsd_any_uri().unwrap()) + .collect(); + + let mut creator_and_moderators = Vec::new(); + + for uri in creator_and_moderator_uris { + let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?; + + creator_and_moderators.push(c_or_m); + } + + // TODO: need to make this work to update mods of existing communities + if community_id.is_none() { + let community_id = community.id; + blocking(pool, move |conn| { + for mod_ in creator_and_moderators { + let community_moderator_form = CommunityModeratorForm { + community_id, + user_id: mod_.id, + }; + + CommunityModerator::join(conn, &community_moderator_form)?; + } + Ok(()) as Result<(), LemmyError> + }) + .await??; + } + + // fetch outbox (maybe make this conditional) + let outbox = + fetch_remote_object::(client, &community.get_outbox_url()?).await?; + let outbox_items = outbox.items().clone(); + for o in outbox_items.many().unwrap() { + let page = PageExt::from_any_base(o)?.unwrap(); + let post = PostForm::from_apub(&page, client, pool).await?; + let post_ap_id = post.ap_id.clone(); + // Check whether the post already exists in the local db + let existing = blocking(pool, move |conn| Post::read_from_apub_id(conn, &post_ap_id)).await?; + match existing { + Ok(e) => blocking(pool, move |conn| Post::update(conn, e.id, &post)).await??, + Err(_) => blocking(pool, move |conn| Post::create(conn, &post)).await??, + }; + } + + Ok(community) +} + fn upsert_post(post_form: &PostForm, conn: &PgConnection) -> Result { let existing = Post::read_from_apub_id(conn, &post_form.ap_id); match existing { diff --git a/server/src/apub/mod.rs b/server/src/apub/mod.rs index 28eb86ac2c..feb1f30fc1 100644 --- a/server/src/apub/mod.rs +++ b/server/src/apub/mod.rs @@ -280,8 +280,8 @@ pub trait ActorType { } // TODO move these to the db rows - fn get_inbox_url(&self) -> String { - format!("{}/inbox", &self.actor_id_str()) + fn get_inbox_url(&self) -> Result { + Url::parse(&format!("{}/inbox", &self.actor_id_str())) } // TODO: make this return `Result @@ -289,8 +289,8 @@ pub trait ActorType { get_shared_inbox(&self.actor_id().unwrap()) } - fn get_outbox_url(&self) -> String { - format!("{}/outbox", &self.actor_id_str()) + fn get_outbox_url(&self) -> Result { + Url::parse(&format!("{}/outbox", &self.actor_id_str())) } fn get_followers_url(&self) -> String { diff --git a/server/src/apub/post.rs b/server/src/apub/post.rs index f71f49b597..39e4faf341 100644 --- a/server/src/apub/post.rs +++ b/server/src/apub/post.rs @@ -15,7 +15,8 @@ use crate::{ }, blocking, routes::DbPoolParam, - DbPool, LemmyError, + DbPool, + LemmyError, }; use activitystreams_ext::Ext1; use activitystreams_new::{ diff --git a/server/src/apub/user.rs b/server/src/apub/user.rs index 0e90941d6d..4632086039 100644 --- a/server/src/apub/user.rs +++ b/server/src/apub/user.rs @@ -63,9 +63,9 @@ impl ToApub for User_ { person.set_icon(image.into_any_base()?); } - let mut ap_actor = ApActor::new(self.get_inbox_url().parse()?, person); + let mut ap_actor = ApActor::new(self.get_inbox_url()?, person); ap_actor - .set_outbox(self.get_outbox_url().parse()?) + .set_outbox(self.get_outbox_url()?) .set_followers(self.get_followers_url().parse()?) .set_following(self.get_following_url().parse()?) .set_liked(self.get_liked_url().parse()?) diff --git a/server/src/routes/federation.rs b/server/src/routes/federation.rs index 93aaac1c18..2a0c81b235 100644 --- a/server/src/routes/federation.rs +++ b/server/src/routes/federation.rs @@ -28,11 +28,10 @@ pub fn config(cfg: &mut web::ServiceConfig) { "/c/{community_name}/followers", web::get().to(get_apub_community_followers), ) - // TODO This is only useful for history which we aren't doing right now - // .route( - // "/c/{community_name}/outbox", - // web::get().to(get_apub_community_outbox), - // ) + .route( + "/c/{community_name}/outbox", + web::get().to(get_apub_community_outbox), + ) .route("/u/{user_name}", web::get().to(get_apub_user_http)) .route("/post/{post_id}", web::get().to(get_apub_post)) .route("/comment/{comment_id}", web::get().to(get_apub_comment)),