diff --git a/server/src/api/community.rs b/server/src/api/community.rs index 3edecb4f..4e39dabf 100644 --- a/server/src/api/community.rs +++ b/server/src/api/community.rs @@ -3,6 +3,7 @@ use crate::apub::{format_community_name, gen_keypair_str, make_apub_endpoint, En use diesel::PgConnection; use std::str::FromStr; use url::Url; +use crate::apub::activities::follow_community; #[derive(Serialize, Deserialize)] pub struct GetCommunity { @@ -401,21 +402,28 @@ impl Perform for Oper { let user_id = claims.id; - let community_follower_form = CommunityFollowerForm { - community_id: data.community_id, - user_id, - }; + let community = Community::read(conn, data.community_id)?; + if community.local { + let community_follower_form = CommunityFollowerForm { + community_id: data.community_id, + user_id, + }; - if data.follow { - match CommunityFollower::follow(&conn, &community_follower_form) { - Ok(user) => user, - Err(_e) => return Err(APIError::err("community_follower_already_exists").into()), - }; + if data.follow { + match CommunityFollower::follow(&conn, &community_follower_form) { + Ok(user) => user, + Err(_e) => return Err(APIError::err("community_follower_already_exists").into()), + }; + } else { + match CommunityFollower::ignore(&conn, &community_follower_form) { + Ok(user) => user, + Err(_e) => return Err(APIError::err("community_follower_already_exists").into()), + }; + } } else { - match CommunityFollower::ignore(&conn, &community_follower_form) { - Ok(user) => user, - Err(_e) => return Err(APIError::err("community_follower_already_exists").into()), - }; + let user = User_::read(conn, user_id)?; + follow_community(&community, &user, conn)?; + // TODO: this needs to return a "pending" state, until Accept is received from the remote server } let community_view = CommunityView::read(&conn, data.community_id, Some(user_id))?; diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs index 0c1a1901..3b302f1b 100644 --- a/server/src/apub/activities.rs +++ b/server/src/apub/activities.rs @@ -3,7 +3,7 @@ use crate::db::community::Community; use crate::db::post::Post; use crate::db::user::User_; use crate::db::Crud; -use activitystreams::activity::{Create, Update}; +use activitystreams::activity::{Create, Update, Follow, Accept}; use activitystreams::object::properties::ObjectProperties; use activitystreams::{context, public}; use diesel::PgConnection; @@ -11,6 +11,7 @@ use failure::Error; use failure::_core::fmt::Debug; use isahc::prelude::*; use serde::Serialize; +use activitystreams::BaseBox; fn populate_object_props( props: &mut ObjectProperties, @@ -28,19 +29,13 @@ fn populate_object_props( Ok(()) } -fn send_activity(activity: &A) -> Result<(), Error> +fn send_activity(activity: &A, to: Vec) -> Result<(), Error> where A: Serialize + Debug, { let json = serde_json::to_string(&activity)?; - for i in get_following_instances() { - // TODO: need to send this to the inbox of following users - let inbox = format!( - "{}://{}/federation/inbox", - get_apub_protocol_string(), - i.domain - ); - let res = Request::post(inbox) + for t in to { + let res = Request::post(t) .header("Content-Type", "application/json") .body(json.to_owned())? .send()?; @@ -49,6 +44,15 @@ where Ok(()) } +fn get_followers(_community: &Community) -> Vec { + // TODO: this is wrong, needs to go to the (non-local) followers of the community + get_following_instances().iter().map(|i| format!( + "{}://{}/federation/inbox", + get_apub_protocol_string(), + i.domain + )).collect() +} + pub fn post_create(post: &Post, creator: &User_, conn: &PgConnection) -> Result<(), Error> { let page = post.as_page(conn)?; let community = Community::read(conn, post.community_id)?; @@ -62,7 +66,7 @@ pub fn post_create(post: &Post, creator: &User_, conn: &PgConnection) -> Result< .create_props .set_actor_xsd_any_uri(creator.actor_id.to_owned())? .set_object_base_box(page)?; - send_activity(&create)?; + send_activity(&create, get_followers(&community))?; Ok(()) } @@ -79,6 +83,36 @@ pub fn post_update(post: &Post, creator: &User_, conn: &PgConnection) -> Result< .update_props .set_actor_xsd_any_uri(creator.actor_id.to_owned())? .set_object_base_box(page)?; - send_activity(&update)?; + send_activity(&update, get_followers(&community))?; + Ok(()) +} + +pub fn follow_community(community: &Community, user: &User_, _conn: &PgConnection) -> Result<(), Error> { + let mut follow = Follow::new(); + follow.object_props + .set_context_xsd_any_uri(context())? + // TODO: needs proper id + .set_id(user.actor_id.clone())?; + follow.follow_props + .set_actor_xsd_any_uri(user.actor_id.clone())? + .set_object_xsd_any_uri(community.actor_id.clone())?; + println!("follow community: {:?}", &follow); + let to = format!("{}/inbox", community.actor_id); + println!("send follow to: {}", &to); + send_activity(&follow, vec!(to))?; + Ok(()) +} + +pub fn accept_follow(follow: &Follow) -> Result<(), Error> { + let mut accept = Accept::new(); + accept.object_props + .set_context_xsd_any_uri(context())? + // TODO: needs proper id + .set_id(follow.follow_props.get_actor_xsd_any_uri().unwrap().to_string())?; + accept.accept_props.set_object_base_box(BaseBox::from_concrete(follow.clone())?)?; + println!("accept follow: {:?}", &accept); + let to = format!("{}/inbox", follow.follow_props.get_actor_xsd_any_uri().unwrap().to_string()); + println!("send accept to: {}", &to); + send_activity(&follow, vec!(to))?; Ok(()) } diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index a56d81d0..025858f6 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -64,7 +64,7 @@ impl Community { .set_id(self.actor_id.to_owned())? .set_name_xsd_string(self.name.to_owned())? .set_published(convert_datetime(self.published))? - .set_attributed_to_xsd_any_uri(make_apub_endpoint(EndpointType::User, &creator.name))?; + .set_attributed_to_xsd_any_uri(make_apub_endpoint(EndpointType::Community, &creator.name))?; if let Some(u) = self.updated.to_owned() { oprops.set_updated(convert_datetime(u))?; diff --git a/server/src/apub/inbox.rs b/server/src/apub/inbox.rs index cc844224..14c41497 100644 --- a/server/src/apub/inbox.rs +++ b/server/src/apub/inbox.rs @@ -1,16 +1,18 @@ use crate::db::post::{Post, PostForm}; use crate::db::Crud; use activitystreams::object::Page; -use activitystreams::{ - object::{Object, ObjectBox}, - primitives::XsdAnyUri, - Base, BaseBox, PropRefs, -}; +use activitystreams::{object::{Object, ObjectBox}, primitives::XsdAnyUri, Base, BaseBox, PropRefs}; use actix_web::{web, HttpResponse}; use diesel::r2d2::{ConnectionManager, Pool}; use diesel::PgConnection; use failure::Error; use std::collections::HashMap; +use crate::apub::fetcher::fetch_remote_user; +use url::Url; +use crate::db::community::{Community, CommunityFollower, CommunityFollowerForm}; +use crate::apub::activities::accept_follow; +use crate::db::Followable; +use activitystreams::activity::Follow; // TODO: need a proper actor that has this inbox @@ -23,19 +25,27 @@ pub async fn inbox( match input.kind { ValidTypes::Create => handle_create(&input, conn), ValidTypes::Update => handle_update(&input, conn), + ValidTypes::Follow => handle_follow(&input, conn), + ValidTypes::Accept => handle_accept(&input, conn), } } -fn handle_create(create: &AcceptedObjects, conn: &PgConnection) -> Result { - let page = create.object.to_owned().to_concrete::()?; +fn handle_create(input: &AcceptedObjects, conn: &PgConnection) -> Result { + let page = match &input.object { + ValidObjects::Id(url) => return Err(format_err!("Invalid object for create activity: {}", url)), + ValidObjects::Object(obj) => obj.to_owned().to_concrete::()?, + }; let post = PostForm::from_page(&page, conn)?; Post::create(conn, &post)?; // TODO: send the new post out via websocket Ok(HttpResponse::Ok().finish()) } -fn handle_update(update: &AcceptedObjects, conn: &PgConnection) -> Result { - let page = update.object.to_owned().to_concrete::()?; +fn handle_update(input: &AcceptedObjects, conn: &PgConnection) -> Result { + let page = match &input.object { + ValidObjects::Id(url) => return Err(format_err!("Invalid object for update activity: {}", url)), + ValidObjects::Object(obj) => obj.to_owned().to_concrete::()?, + }; let post = PostForm::from_page(&page, conn)?; let id = Post::read_from_apub_id(conn, &post.ap_id)?.id; Post::update(conn, id, &post)?; @@ -43,6 +53,41 @@ fn handle_update(update: &AcceptedObjects, conn: &PgConnection) -> Result Result { + println!("received follow: {:?}", &input); + // TODO: we shouldnt have to rebuild the follow object from scratch + let mut follow = Follow::new(); + follow.object_props + .set_id(input.id.to_owned())?; + follow.follow_props + .set_actor_xsd_any_uri(input.actor.clone())?; + match &input.object { + ValidObjects::Id(url) => { + follow.follow_props + .set_object_xsd_any_uri(url.clone())?; + }, + ValidObjects::Object(obj) => return Err(format_err!("Invalid object for update activity: {:?}", obj)), + }; + + // TODO: make sure this is a local community + let community = Community::read_from_actor_id(conn, &follow.follow_props.get_object_xsd_any_uri().unwrap().to_string())?; + let user = fetch_remote_user(&Url::parse(&input.actor.to_string())?, conn)?; + // TODO: insert ID of the user into follows of the community + let community_follower_form = CommunityFollowerForm { + community_id: community.id, + user_id: user.id, + }; + CommunityFollower::follow(&conn, &community_follower_form)?; + accept_follow(&follow)?; + Ok(HttpResponse::Ok().finish()) +} + +fn handle_accept(_input: &AcceptedObjects, _conn: &PgConnection) -> Result { + println!("received accept: {:?}", &_input); + // TODO: at this point, indicate to the user that they are following the community + Ok(HttpResponse::Ok().finish()) +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct AcceptedObjects { @@ -53,7 +98,7 @@ pub struct AcceptedObjects { pub actor: XsdAnyUri, - pub object: BaseBox, + pub object: ValidObjects, #[serde(flatten)] ext: HashMap, @@ -64,6 +109,8 @@ pub struct AcceptedObjects { pub enum ValidTypes { Create, Update, + Follow, + Accept, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -71,7 +118,7 @@ pub enum ValidTypes { #[serde(rename_all = "camelCase")] pub enum ValidObjects { Id(XsdAnyUri), - Object(AnyExistingObject), + Object(BaseBox), } #[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PropRefs)] diff --git a/server/src/apub/post.rs b/server/src/apub/post.rs index e8f53904..27501d50 100644 --- a/server/src/apub/post.rs +++ b/server/src/apub/post.rs @@ -46,7 +46,7 @@ impl Post { .set_summary_xsd_string(self.name.to_owned())? .set_published(convert_datetime(self.published))? .set_to_xsd_any_uri(community.actor_id)? - .set_attributed_to_xsd_any_uri(make_apub_endpoint(EndpointType::User, &creator.name))?; + .set_attributed_to_xsd_any_uri(make_apub_endpoint(EndpointType::Post, &creator.name))?; if let Some(body) = &self.body { oprops.set_content_xsd_string(body.to_owned())?; diff --git a/server/src/main.rs b/server/src/main.rs index 9f78d43e..59dc2cb7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -39,8 +39,6 @@ async fn main() -> Result<(), Error> { // Set up websocket server let server = ChatServer::startup(pool.clone()).start(); - // TODO: its probably failing because the other instance is not up yet - // need to make a new thread and wait a bit before fetching thread::spawn(move || { // some work here sleep(Duration::from_secs(5)); diff --git a/server/src/routes/federation.rs b/server/src/routes/federation.rs index 100e548f..7f834777 100644 --- a/server/src/routes/federation.rs +++ b/server/src/routes/federation.rs @@ -12,7 +12,8 @@ pub fn config(cfg: &mut web::ServiceConfig) { ) // TODO: this needs to be moved to the actors (eg /federation/u/{}/inbox) .route("/federation/inbox", web::post().to(apub::inbox::inbox)) - .route("/federation/inbox", web::post().to(apub::inbox::inbox)) + .route("/federation/c/{community_name}/inbox", web::post().to(apub::inbox::inbox)) + .route("/federation/u/{user_name}/inbox", web::post().to(apub::inbox::inbox)) .route( "/federation/c/{community_name}", web::get().to(apub::community::get_apub_community_http),