Add community outbox (ref #1018)
This commit is contained in:
parent
705e74f4b9
commit
678e1fa927
7 changed files with 129 additions and 65 deletions
|
@ -76,6 +76,9 @@ impl Post {
|
|||
use crate::schema::post::dsl::*;
|
||||
post
|
||||
.filter(community_id.eq(the_community_id))
|
||||
.then_order_by(published.desc())
|
||||
.then_order_by(stickied.desc())
|
||||
.limit(20)
|
||||
.load::<Self>(conn)
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ use activitystreams_new::{
|
|||
},
|
||||
actor::{kind::GroupType, ApActor, Endpoints, Group},
|
||||
base::{AnyBase, BaseExt},
|
||||
collection::UnorderedCollection,
|
||||
collection::{OrderedCollection, UnorderedCollection},
|
||||
context,
|
||||
object::Tombstone,
|
||||
prelude::*,
|
||||
|
@ -43,6 +43,7 @@ use lemmy_db::{
|
|||
community::{Community, CommunityForm},
|
||||
community_view::{CommunityFollowerView, CommunityModeratorView},
|
||||
naive_now,
|
||||
post::Post,
|
||||
user::User_,
|
||||
};
|
||||
use lemmy_utils::convert_datetime;
|
||||
|
@ -88,10 +89,10 @@ impl ToApub for Community {
|
|||
group.set_content(d);
|
||||
}
|
||||
|
||||
let mut ap_actor = ApActor::new(self.get_inbox_url().parse()?, group);
|
||||
let mut ap_actor = ApActor::new(self.get_inbox_url()?, group);
|
||||
ap_actor
|
||||
.set_preferred_username(self.title.to_owned())
|
||||
.set_outbox(self.get_outbox_url().parse()?)
|
||||
.set_outbox(self.get_outbox_url()?)
|
||||
.set_followers(self.get_followers_url().parse()?)
|
||||
.set_following(self.get_following_url().parse()?)
|
||||
.set_liked(self.get_liked_url().parse()?)
|
||||
|
@ -411,6 +412,35 @@ pub async fn get_apub_community_followers(
|
|||
Ok(create_apub_response(&collection))
|
||||
}
|
||||
|
||||
pub async fn get_apub_community_outbox(
|
||||
info: web::Path<CommunityQuery>,
|
||||
db: DbPoolParam,
|
||||
) -> Result<HttpResponse<Body>, LemmyError> {
|
||||
let community = blocking(&db, move |conn| {
|
||||
Community::read_from_name(&conn, &info.community_name)
|
||||
})
|
||||
.await??;
|
||||
|
||||
let community_id = community.id;
|
||||
let posts = blocking(&db, move |conn| {
|
||||
Post::list_for_community(conn, community_id)
|
||||
})
|
||||
.await??;
|
||||
|
||||
let mut pages: Vec<AnyBase> = vec![];
|
||||
for p in posts {
|
||||
pages.push(p.to_apub(&db).await?.into_any_base()?);
|
||||
}
|
||||
|
||||
let len = pages.len();
|
||||
let mut collection = OrderedCollection::new(pages);
|
||||
collection
|
||||
.set_context(context())
|
||||
.set_id(community.get_outbox_url()?)
|
||||
.set_total_items(len as u64);
|
||||
Ok(create_apub_response(&collection))
|
||||
}
|
||||
|
||||
pub async fn do_announce(
|
||||
activity: AnyBase,
|
||||
community: &Community,
|
||||
|
|
|
@ -15,7 +15,7 @@ use crate::{
|
|||
DbPool,
|
||||
LemmyError,
|
||||
};
|
||||
use activitystreams_new::{base::BaseExt, object::Note, prelude::*};
|
||||
use activitystreams_new::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
|
||||
use actix_web::client::Client;
|
||||
use chrono::NaiveDateTime;
|
||||
use diesel::{result::Error::NotFound, PgConnection};
|
||||
|
@ -40,6 +40,7 @@ use std::{fmt::Debug, time::Duration};
|
|||
use url::Url;
|
||||
|
||||
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
|
||||
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
|
||||
|
||||
// Fetch nodeinfo metadata from a remote instance.
|
||||
async fn _fetch_node_info(client: &Client, domain: &str) -> Result<NodeInfo, LemmyError> {
|
||||
|
@ -257,12 +258,13 @@ pub async fn get_or_fetch_and_upsert_user(
|
|||
/// TODO it won't pick up new avatars, summaries etc until a day after.
|
||||
/// Actors need an "update" activity pushed to other servers to fix this.
|
||||
fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
|
||||
if cfg!(debug_assertions) {
|
||||
true
|
||||
let update_interval = if cfg!(debug_assertions) {
|
||||
// avoid infinite loop when fetching community outbox
|
||||
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
|
||||
} else {
|
||||
let update_interval = chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS);
|
||||
last_refreshed.lt(&(naive_now() - update_interval))
|
||||
}
|
||||
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
|
||||
};
|
||||
last_refreshed.lt(&(naive_now() - update_interval))
|
||||
}
|
||||
|
||||
/// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
|
||||
|
@ -280,59 +282,88 @@ pub async fn get_or_fetch_and_upsert_community(
|
|||
match community {
|
||||
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
|
||||
debug!("Fetching and updating from remote community: {}", apub_id);
|
||||
let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
|
||||
|
||||
let mut cf = CommunityForm::from_apub(&group, client, pool).await?;
|
||||
cf.last_refreshed_at = Some(naive_now());
|
||||
let community = blocking(pool, move |conn| Community::update(conn, c.id, &cf)).await??;
|
||||
|
||||
Ok(community)
|
||||
fetch_remote_community(apub_id, client, pool, Some(c.id)).await
|
||||
}
|
||||
Ok(c) => Ok(c),
|
||||
Err(NotFound {}) => {
|
||||
debug!("Fetching and creating remote community: {}", apub_id);
|
||||
let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
|
||||
|
||||
let cf = CommunityForm::from_apub(&group, client, pool).await?;
|
||||
let community = blocking(pool, move |conn| Community::create(conn, &cf)).await??;
|
||||
|
||||
// Also add the community moderators too
|
||||
let attributed_to = group.inner.attributed_to().unwrap();
|
||||
let creator_and_moderator_uris: Vec<&Url> = attributed_to
|
||||
.as_many()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|a| a.as_xsd_any_uri().unwrap())
|
||||
.collect();
|
||||
|
||||
let mut creator_and_moderators = Vec::new();
|
||||
|
||||
for uri in creator_and_moderator_uris {
|
||||
let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?;
|
||||
|
||||
creator_and_moderators.push(c_or_m);
|
||||
}
|
||||
|
||||
let community_id = community.id;
|
||||
blocking(pool, move |conn| {
|
||||
for mod_ in creator_and_moderators {
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id,
|
||||
user_id: mod_.id,
|
||||
};
|
||||
|
||||
CommunityModerator::join(conn, &community_moderator_form)?;
|
||||
}
|
||||
Ok(()) as Result<(), LemmyError>
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(community)
|
||||
fetch_remote_community(apub_id, client, pool, None).await
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_remote_community(
|
||||
apub_id: &Url,
|
||||
client: &Client,
|
||||
pool: &DbPool,
|
||||
community_id: Option<i32>,
|
||||
) -> Result<Community, LemmyError> {
|
||||
let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
|
||||
|
||||
let cf = CommunityForm::from_apub(&group, client, pool).await?;
|
||||
let community = blocking(pool, move |conn| {
|
||||
if let Some(cid) = community_id {
|
||||
Community::update(conn, cid, &cf)
|
||||
} else {
|
||||
Community::create(conn, &cf)
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
// Also add the community moderators too
|
||||
let attributed_to = group.inner.attributed_to().unwrap();
|
||||
let creator_and_moderator_uris: Vec<&Url> = attributed_to
|
||||
.as_many()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|a| a.as_xsd_any_uri().unwrap())
|
||||
.collect();
|
||||
|
||||
let mut creator_and_moderators = Vec::new();
|
||||
|
||||
for uri in creator_and_moderator_uris {
|
||||
let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?;
|
||||
|
||||
creator_and_moderators.push(c_or_m);
|
||||
}
|
||||
|
||||
// TODO: need to make this work to update mods of existing communities
|
||||
if community_id.is_none() {
|
||||
let community_id = community.id;
|
||||
blocking(pool, move |conn| {
|
||||
for mod_ in creator_and_moderators {
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id,
|
||||
user_id: mod_.id,
|
||||
};
|
||||
|
||||
CommunityModerator::join(conn, &community_moderator_form)?;
|
||||
}
|
||||
Ok(()) as Result<(), LemmyError>
|
||||
})
|
||||
.await??;
|
||||
}
|
||||
|
||||
// fetch outbox (maybe make this conditional)
|
||||
let outbox =
|
||||
fetch_remote_object::<OrderedCollection>(client, &community.get_outbox_url()?).await?;
|
||||
let outbox_items = outbox.items().clone();
|
||||
for o in outbox_items.many().unwrap() {
|
||||
let page = PageExt::from_any_base(o)?.unwrap();
|
||||
let post = PostForm::from_apub(&page, client, pool).await?;
|
||||
let post_ap_id = post.ap_id.clone();
|
||||
// Check whether the post already exists in the local db
|
||||
let existing = blocking(pool, move |conn| Post::read_from_apub_id(conn, &post_ap_id)).await?;
|
||||
match existing {
|
||||
Ok(e) => blocking(pool, move |conn| Post::update(conn, e.id, &post)).await??,
|
||||
Err(_) => blocking(pool, move |conn| Post::create(conn, &post)).await??,
|
||||
};
|
||||
}
|
||||
|
||||
Ok(community)
|
||||
}
|
||||
|
||||
fn upsert_post(post_form: &PostForm, conn: &PgConnection) -> Result<Post, LemmyError> {
|
||||
let existing = Post::read_from_apub_id(conn, &post_form.ap_id);
|
||||
match existing {
|
||||
|
|
|
@ -280,8 +280,8 @@ pub trait ActorType {
|
|||
}
|
||||
|
||||
// TODO move these to the db rows
|
||||
fn get_inbox_url(&self) -> String {
|
||||
format!("{}/inbox", &self.actor_id_str())
|
||||
fn get_inbox_url(&self) -> Result<Url, ParseError> {
|
||||
Url::parse(&format!("{}/inbox", &self.actor_id_str()))
|
||||
}
|
||||
|
||||
// TODO: make this return `Result<Url, ParseError>
|
||||
|
@ -289,8 +289,8 @@ pub trait ActorType {
|
|||
get_shared_inbox(&self.actor_id().unwrap())
|
||||
}
|
||||
|
||||
fn get_outbox_url(&self) -> String {
|
||||
format!("{}/outbox", &self.actor_id_str())
|
||||
fn get_outbox_url(&self) -> Result<Url, ParseError> {
|
||||
Url::parse(&format!("{}/outbox", &self.actor_id_str()))
|
||||
}
|
||||
|
||||
fn get_followers_url(&self) -> String {
|
||||
|
|
|
@ -15,7 +15,8 @@ use crate::{
|
|||
},
|
||||
blocking,
|
||||
routes::DbPoolParam,
|
||||
DbPool, LemmyError,
|
||||
DbPool,
|
||||
LemmyError,
|
||||
};
|
||||
use activitystreams_ext::Ext1;
|
||||
use activitystreams_new::{
|
||||
|
|
|
@ -63,9 +63,9 @@ impl ToApub for User_ {
|
|||
person.set_icon(image.into_any_base()?);
|
||||
}
|
||||
|
||||
let mut ap_actor = ApActor::new(self.get_inbox_url().parse()?, person);
|
||||
let mut ap_actor = ApActor::new(self.get_inbox_url()?, person);
|
||||
ap_actor
|
||||
.set_outbox(self.get_outbox_url().parse()?)
|
||||
.set_outbox(self.get_outbox_url()?)
|
||||
.set_followers(self.get_followers_url().parse()?)
|
||||
.set_following(self.get_following_url().parse()?)
|
||||
.set_liked(self.get_liked_url().parse()?)
|
||||
|
|
|
@ -28,11 +28,10 @@ pub fn config(cfg: &mut web::ServiceConfig) {
|
|||
"/c/{community_name}/followers",
|
||||
web::get().to(get_apub_community_followers),
|
||||
)
|
||||
// TODO This is only useful for history which we aren't doing right now
|
||||
// .route(
|
||||
// "/c/{community_name}/outbox",
|
||||
// web::get().to(get_apub_community_outbox),
|
||||
// )
|
||||
.route(
|
||||
"/c/{community_name}/outbox",
|
||||
web::get().to(get_apub_community_outbox),
|
||||
)
|
||||
.route("/u/{user_name}", web::get().to(get_apub_user_http))
|
||||
.route("/post/{post_id}", web::get().to(get_apub_post))
|
||||
.route("/comment/{comment_id}", web::get().to(get_apub_comment)),
|
||||
|
|
Loading…
Reference in a new issue