diff --git a/Cargo.lock b/Cargo.lock index 555c956..e569898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,7 +5,6 @@ version = 3 [[package]] name = "activitypub_federation" version = "0.5.0-beta.5" -source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=diesel-feature#ca42d891b10888c0dcc666140385d380c664a978" dependencies = [ "activitystreams-kinds", "async-trait", @@ -488,6 +487,7 @@ checksum = "62c6fcf842f17f8c78ecf7c81d75c5ce84436b41ee07e03f490fbb5f5a8731d8" dependencies = [ "bitflags 2.4.1", "byteorder", + "chrono", "diesel_derives", "itoa", "pq-sys", diff --git a/Cargo.toml b/Cargo.toml index 47d36e9..6d41432 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,13 +4,13 @@ version = "0.1.0" edition = "2021" [dependencies] -activitypub_federation = { git = "https://github.com/LemmyNet/activitypub-federation-rust.git", branch = "diesel-feature", features = ["axum", "diesel"], default-features = false } +activitypub_federation = { path = "../lemmy/activitypub-federation-rust", features = ["axum", "diesel"], default-features = false } anyhow = "1.0.75" async-trait = "0.1.74" axum = "0.6.20" axum-macros = "0.3.8" chrono = { version = "0.4.31", features = ["serde"] } -diesel = {version = "2.1.4", features = ["postgres"] } +diesel = {version = "2.1.4", features = ["postgres", "chrono"] } diesel-derive-newtype = "2.1.0" diesel_migrations = "2.1.0" diffy = "0.3.0" diff --git a/migrations/2023-11-28-150402_article/down.sql b/migrations/2023-11-28-150402_article/down.sql index f5b240a..d01733f 100644 --- a/migrations/2023-11-28-150402_article/down.sql +++ b/migrations/2023-11-28-150402_article/down.sql @@ -1,2 +1,4 @@ drop table edit; -drop table article; \ No newline at end of file +drop table article; +drop table instance_follow; +drop table instance; \ No newline at end of file diff --git a/migrations/2023-11-28-150402_article/up.sql b/migrations/2023-11-28-150402_article/up.sql index bc988c8..d4a8d7d 100644 --- a/migrations/2023-11-28-150402_article/up.sql +++ b/migrations/2023-11-28-150402_article/up.sql @@ -1,9 +1,27 @@ +create table instance ( + id serial primary key, + ap_id varchar(255) not null unique, + inbox_url text not null, + articles_url varchar(255) not null unique, + public_key text not null, + private_key text, + last_refreshed_at timestamptz not null default now(), + local bool not null +); + +create table instance_follow ( + id serial primary key, + follower_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, + followed_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, + pending boolean not null + +); create table article ( id serial primary key, title text not null, text text not null, ap_id varchar(255) not null unique, - instance_id varchar(255) not null, + instance_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, local bool not null ); diff --git a/src/api.rs b/src/api.rs index 27da587..2895ba2 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,10 +1,11 @@ use crate::database::article::{ArticleView, DbArticle, DbArticleForm}; use crate::database::edit::{DbEdit, EditVersion}; +use crate::database::instance::{DbInstance, InstanceView}; use crate::database::{DbConflict, MyDataHandle}; use crate::error::MyResult; use crate::federation::activities::create_article::CreateArticle; +use crate::federation::activities::follow::Follow; use crate::federation::activities::submit_article_update; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_article_version; use activitypub_federation::config::Data; use activitypub_federation::fetch::object_id::ObjectId; @@ -45,23 +46,18 @@ async fn create_article( data: Data, Form(create_article): Form, ) -> MyResult> { - let existing_article = DbArticle::read_local_title(&create_article.title, &data.db_connection); - if existing_article.is_ok() { - return Err(anyhow!("A local article with this title already exists").into()); - } - - let instance_id = data.local_instance().ap_id; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let ap_id = ObjectId::parse(&format!( "http://{}:{}/article/{}", - instance_id.inner().domain().unwrap(), - instance_id.inner().port().unwrap(), + local_instance.ap_id.inner().domain().unwrap(), + local_instance.ap_id.inner().port().unwrap(), create_article.title ))?; let form = DbArticleForm { title: create_article.title, text: String::new(), ap_id, - instance_id, + instance_id: local_instance.id, local: true, }; let article = DbArticle::create(&form, &data.db_connection)?; @@ -201,13 +197,14 @@ async fn resolve_article( /// Retrieve the local instance info. #[debug_handler] -async fn get_local_instance(data: Data) -> MyResult> { - Ok(Json(data.local_instance())) +async fn get_local_instance(data: Data) -> MyResult> { + let local_instance = DbInstance::read_local_view(&data.db_connection)?; + Ok(Json(local_instance)) } #[derive(Deserialize, Serialize, Debug)] pub struct FollowInstance { - pub instance_id: ObjectId, + pub id: i32, } /// Make the local instance follow a given remote instance, to receive activities about new and @@ -217,8 +214,12 @@ async fn follow_instance( data: Data, Form(query): Form, ) -> MyResult<()> { - let instance = query.instance_id.dereference(&data).await?; - data.local_instance().follow(&instance, &data).await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + let target = DbInstance::read(query.id, &data.db_connection)?; + let pending = !target.local; + DbInstance::follow(local_instance.id, target.id, pending, &data)?; + let instance = DbInstance::read(query.id, &data.db_connection)?; + Follow::send(local_instance, instance, &data).await?; Ok(()) } @@ -274,29 +275,25 @@ async fn fork_article( ) -> MyResult> { // TODO: lots of code duplicated from create_article(), can move it into helper let original_article = DbArticle::read(fork_form.article_id, &data.db_connection)?; - let existing_article = - DbArticle::read_local_title(&original_article.title, &data.db_connection); - if existing_article.is_ok() { - return Err(anyhow!("A local article with this title already exists").into()); - } - let instance_id = data.local_instance().ap_id; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let ap_id = ObjectId::parse(&format!( "http://{}:{}/article/{}", - instance_id.inner().domain().unwrap(), - instance_id.inner().port().unwrap(), + local_instance.ap_id.inner().domain().unwrap(), + local_instance.ap_id.inner().port().unwrap(), original_article.title ))?; let form = DbArticleForm { title: original_article.title.clone(), text: original_article.text.clone(), ap_id, - instance_id, + instance_id: local_instance.id, local: true, }; let article = DbArticle::create(&form, &data.db_connection)?; // copy edits to new article + // TODO: convert to sql let edits = DbEdit::for_article(&original_article, &data.db_connection)?; for e in edits { let form = e.copy_to_local_fork(&article)?; diff --git a/src/database/article.rs b/src/database/article.rs index 8874b7c..ef82824 100644 --- a/src/database/article.rs +++ b/src/database/article.rs @@ -1,8 +1,8 @@ use crate::database::edit::{DbEdit, EditVersion}; +use crate::database::instance::DbInstance; use crate::database::schema::article; use crate::error::MyResult; use crate::federation::objects::edits_collection::DbEditCollection; -use crate::federation::objects::instance::DbInstance; use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::fetch::object_id::ObjectId; use diesel::pg::PgConnection; @@ -18,13 +18,13 @@ use std::ops::DerefMut; use std::sync::Mutex; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)] -#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))] +#[diesel(table_name = article, check_for_backend(diesel::pg::Pg), belongs_to(DbInstance, foreign_key = instance_id))] pub struct DbArticle { pub id: i32, pub title: String, pub text: String, pub ap_id: ObjectId, - pub instance_id: ObjectId, + pub instance_id: i32, pub local: bool, } @@ -42,8 +42,7 @@ pub struct DbArticleForm { pub title: String, pub text: String, pub ap_id: ObjectId, - // TODO: change to foreign key - pub instance_id: ObjectId, + pub instance_id: i32, pub local: bool, } diff --git a/src/database/instance.rs b/src/database/instance.rs new file mode 100644 index 0000000..e2f26e5 --- /dev/null +++ b/src/database/instance.rs @@ -0,0 +1,201 @@ +use crate::database::article::DbArticle; +use crate::database::schema::{instance, instance_follow}; +use crate::database::MyDataHandle; +use crate::error::{Error, MyResult}; +use crate::federation::activities::follow::Follow; +use crate::federation::objects::articles_collection::DbArticleCollection; +use activitypub_federation::activity_sending::SendActivityTask; +use activitypub_federation::config::Data; +use activitypub_federation::fetch::collection_id::CollectionId; +use activitypub_federation::fetch::object_id::ObjectId; +use activitypub_federation::protocol::context::WithContext; +use activitypub_federation::traits::{ActivityHandler, Actor}; +use chrono::{DateTime, Utc}; +use diesel::ExpressionMethods; +use diesel::{ + insert_into, update, AsChangeset, Identifiable, Insertable, JoinOnDsl, PgConnection, QueryDsl, + Queryable, RunQueryDsl, Selectable, +}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::ops::DerefMut; +use std::sync::Mutex; +use tracing::warn; +use url::Url; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)] +#[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))] +pub struct DbInstance { + pub id: i32, + pub ap_id: ObjectId, + pub articles_url: CollectionId, + pub inbox_url: String, + #[serde(skip)] + pub(crate) public_key: String, + #[serde(skip)] + pub(crate) private_key: Option, + #[serde(skip)] + pub(crate) last_refreshed_at: DateTime, + pub local: bool, +} + +#[derive(Debug, Clone, Insertable, AsChangeset)] +#[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))] +pub struct DbInstanceForm { + pub ap_id: ObjectId, + pub articles_url: CollectionId, + pub inbox_url: String, + pub(crate) public_key: String, + pub(crate) private_key: Option, + pub(crate) last_refreshed_at: DateTime, + pub local: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable)] +#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))] +pub struct InstanceView { + pub instance: DbInstance, + pub followers: Vec, + pub followed: Vec, +} + +impl DbInstance { + pub fn followers_url(&self) -> MyResult { + Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?) + } + + pub fn follower_ids(&self, data: &Data) -> MyResult> { + Ok(DbInstance::read_followers(self.id, &data.db_connection)? + .into_iter() + .map(|f| f.ap_id.into()) + .collect()) + } + + pub async fn send_to_followers( + &self, + activity: Activity, + extra_recipients: Vec, + data: &Data, + ) -> Result<(), ::Error> + where + Activity: ActivityHandler + Serialize + Debug + Send + Sync, + ::Error: From, + ::Error: From, + { + let mut inboxes: Vec<_> = DbInstance::read_followers(self.id, &data.db_connection)? + .iter() + .map(|f| Url::parse(&f.inbox_url).unwrap()) + .collect(); + inboxes.extend( + extra_recipients + .into_iter() + .map(|i| Url::parse(&i.inbox_url).unwrap()), + ); + self.send(activity, inboxes, data).await?; + Ok(()) + } + + pub async fn send( + &self, + activity: Activity, + recipients: Vec, + data: &Data, + ) -> Result<(), ::Error> + where + Activity: ActivityHandler + Serialize + Debug + Send + Sync, + ::Error: From, + { + let activity = WithContext::new_default(activity); + let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; + for send in sends { + let send = send.sign_and_send(data).await; + if let Err(e) = send { + warn!("Failed to send activity {:?}: {e}", activity); + } + } + Ok(()) + } + + pub fn create(form: &DbInstanceForm, conn: &Mutex) -> MyResult { + let mut conn = conn.lock().unwrap(); + Ok(insert_into(instance::table) + .values(form) + .on_conflict(instance::dsl::ap_id) + .do_update() + .set(form) + .get_result(conn.deref_mut())?) + } + + pub fn read(id: i32, conn: &Mutex) -> MyResult { + let mut conn = conn.lock().unwrap(); + Ok(instance::table.find(id).get_result(conn.deref_mut())?) + } + + pub fn read_from_ap_id( + ap_id: &ObjectId, + data: &Data, + ) -> MyResult { + let mut conn = data.db_connection.lock().unwrap(); + Ok(instance::table + .filter(instance::dsl::ap_id.eq(ap_id)) + .get_result(conn.deref_mut())?) + } + + pub fn read_local_instance(conn: &Mutex) -> MyResult { + let mut conn = conn.lock().unwrap(); + Ok(instance::table + .filter(instance::dsl::local.eq(true)) + .get_result(conn.deref_mut())?) + } + + pub fn read_local_view(conn: &Mutex) -> MyResult { + let instance = DbInstance::read_local_instance(conn)?; + let followers = DbInstance::read_followers(instance.id, conn)?; + let followed = DbInstance::read_followed(instance.id, conn)?; + + Ok(InstanceView { + instance, + followers, + followed, + }) + } + + pub fn follow( + follower_id_: i32, + followed_id_: i32, + pending_: bool, + data: &Data, + ) -> MyResult<()> { + use instance_follow::dsl::{followed_id, follower_id, pending}; + let mut conn = data.db_connection.lock().unwrap(); + insert_into(instance_follow::table) + .values(( + follower_id.eq(follower_id_), + followed_id.eq(followed_id_), + pending.eq(pending_), + )) + .execute(conn.deref_mut())?; + Ok(()) + } + + pub fn read_followers(id_: i32, conn: &Mutex) -> MyResult> { + use instance_follow::dsl::{followed_id, id}; + let mut conn = conn.lock().unwrap(); + Ok(instance_follow::table + .inner_join(instance::table.on(id.eq(instance::dsl::id))) + .filter(followed_id.eq(id_)) + .select(instance::all_columns) + .get_results(conn.deref_mut())?) + } + + pub fn read_followed(id_: i32, conn: &Mutex) -> MyResult> { + // TODO: is this correct? + use instance_follow::dsl::{follower_id, id}; + let mut conn = conn.lock().unwrap(); + Ok(instance_follow::table + .inner_join(instance::table.on(id.eq(instance::dsl::id))) + .filter(follower_id.eq(id_)) + .select(instance::all_columns) + .get_results(conn.deref_mut())?) + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index bb40d1a..c29cfd5 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -3,13 +3,13 @@ use crate::database::article::DbArticle; use crate::database::edit::DbEdit; use crate::error::MyResult; use crate::federation::activities::submit_article_update; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_article_version; use activitypub_federation::config::Data; use activitypub_federation::fetch::object_id::ObjectId; use diesel::PgConnection; use diffy::{apply, merge, Patch}; use edit::EditVersion; +use instance::DbInstance; use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Mutex}; @@ -17,6 +17,7 @@ use url::Url; pub mod article; pub mod edit; +pub mod instance; mod schema; #[derive(Clone)] @@ -35,17 +36,9 @@ impl Deref for MyData { pub type MyDataHandle = MyData; pub struct FakeDatabase { - pub instances: Mutex>, pub conflicts: Mutex>, } -impl FakeDatabase { - pub fn local_instance(&self) -> DbInstance { - let lock = self.instances.lock().unwrap(); - lock.iter().find(|i| i.1.local).unwrap().1.clone() - } -} - #[derive(Clone, Debug)] pub struct DbConflict { pub id: i32, diff --git a/src/database/schema.rs b/src/database/schema.rs index 7adad37..4f6a09e 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -7,8 +7,7 @@ diesel::table! { text -> Text, #[max_length = 255] ap_id -> Varchar, - #[max_length = 255] - instance_id -> Varchar, + instance_id -> Int4, local -> Bool, } } @@ -25,6 +24,31 @@ diesel::table! { } } +diesel::table! { + instance (id) { + id -> Int4, + #[max_length = 255] + ap_id -> Varchar, + inbox_url -> Text, + #[max_length = 255] + articles_url -> Varchar, + public_key -> Text, + private_key -> Nullable, + last_refreshed_at -> Timestamptz, + local -> Bool, + } +} + +diesel::table! { + instance_follow (id) { + id -> Int4, + follower_id -> Int4, + followed_id -> Int4, + pending -> Bool, + } +} + +diesel::joinable!(article -> instance (instance_id)); diesel::joinable!(edit -> article (article_id)); -diesel::allow_tables_to_appear_in_same_query!(article, edit,); +diesel::allow_tables_to_appear_in_same_query!(article, edit, instance, instance_follow,); diff --git a/src/federation/activities/accept.rs b/src/federation/activities/accept.rs index b422dc1..eeffd6f 100644 --- a/src/federation/activities/accept.rs +++ b/src/federation/activities/accept.rs @@ -1,5 +1,5 @@ +use crate::database::instance::DbInstance; use crate::error::MyResult; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_activity_id; use crate::{database::MyDataHandle, federation::activities::follow::Follow}; use activitypub_federation::{ @@ -49,9 +49,9 @@ impl ActivityHandler for Accept { async fn receive(self, data: &Data) -> Result<(), Self::Error> { // add to follows - let mut lock = data.instances.lock().unwrap(); - let local_instance = lock.iter_mut().find(|i| i.1.local).unwrap().1; - local_instance.follows.push(self.actor.inner().clone()); + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + let actor = self.actor.dereference(data).await?; + DbInstance::follow(local_instance.id, actor.id, false, &data)?; Ok(()) } } diff --git a/src/federation/activities/create_article.rs b/src/federation/activities/create_article.rs index d36d99b..f03e6dd 100644 --- a/src/federation/activities/create_article.rs +++ b/src/federation/activities/create_article.rs @@ -1,7 +1,7 @@ +use crate::database::instance::DbInstance; use crate::database::{article::DbArticle, MyDataHandle}; use crate::error::MyResult; use crate::federation::objects::article::ApubArticle; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_activity_id; use activitypub_federation::kinds::activity::CreateType; use activitypub_federation::{ @@ -27,12 +27,13 @@ pub struct CreateArticle { impl CreateArticle { pub async fn send_to_followers(article: DbArticle, data: &Data) -> MyResult<()> { - let local_instance = data.local_instance(); + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let object = article.clone().into_json(data).await?; let id = generate_activity_id(local_instance.ap_id.inner())?; + let to = local_instance.follower_ids(&data)?; let create = CreateArticle { actor: local_instance.ap_id.clone(), - to: local_instance.follower_ids(), + to, object, kind: Default::default(), id, @@ -63,9 +64,8 @@ impl ActivityHandler for CreateArticle { async fn receive(self, data: &Data) -> Result<(), Self::Error> { let article = DbArticle::from_json(self.object.clone(), data).await?; if article.local { - data.local_instance() - .send_to_followers(self, vec![], data) - .await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + local_instance.send_to_followers(self, vec![], data).await?; } Ok(()) } diff --git a/src/federation/activities/follow.rs b/src/federation/activities/follow.rs index f5c5095..bb90d8d 100644 --- a/src/federation/activities/follow.rs +++ b/src/federation/activities/follow.rs @@ -1,5 +1,5 @@ +use crate::database::instance::DbInstance; use crate::error::MyResult; -use crate::federation::objects::instance::DbInstance; use crate::{database::MyDataHandle, federation::activities::accept::Accept, generate_activity_id}; use activitypub_federation::{ config::Data, @@ -21,14 +21,22 @@ pub struct Follow { } impl Follow { - pub fn new(actor: ObjectId, object: ObjectId) -> MyResult { - let id = generate_activity_id(actor.inner())?; - Ok(Follow { - actor, - object, + pub async fn send( + local_instance: DbInstance, + to: DbInstance, + data: &Data, + ) -> MyResult<()> { + let id = generate_activity_id(local_instance.ap_id.inner())?; + let follow = Follow { + actor: local_instance.ap_id.clone(), + object: to.ap_id.clone(), kind: Default::default(), id, - }) + }; + local_instance + .send(follow, vec![to.shared_inbox_or_inbox()], data) + .await?; + Ok(()) } } @@ -51,13 +59,8 @@ impl ActivityHandler for Follow { async fn receive(self, data: &Data) -> Result<(), Self::Error> { let actor = self.actor.dereference(data).await?; - // add to followers - let local_instance = { - let mut lock = data.instances.lock().unwrap(); - let local_instance = lock.iter_mut().find(|i| i.1.local).unwrap().1; - local_instance.followers.push(actor); - local_instance.clone() - }; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + DbInstance::follow(actor.id, local_instance.id, false, &data)?; // send back an accept let follower = self.actor.dereference(data).await?; diff --git a/src/federation/activities/mod.rs b/src/federation/activities/mod.rs index 8e73229..626d2ba 100644 --- a/src/federation/activities/mod.rs +++ b/src/federation/activities/mod.rs @@ -1,10 +1,10 @@ use crate::database::article::DbArticle; use crate::database::edit::{DbEdit, DbEditForm, EditVersion}; +use crate::database::instance::DbInstance; use crate::database::MyDataHandle; use crate::error::Error; use crate::federation::activities::update_local_article::UpdateLocalArticle; use crate::federation::activities::update_remote_article::UpdateRemoteArticle; -use crate::federation::objects::instance::DbInstance; use activitypub_federation::config::Data; pub mod accept; @@ -37,11 +37,7 @@ pub async fn submit_article_update( version: form.version, previous_version: form.previous_version, }; - let instance: DbInstance = original_article - .instance_id - .clone() - .dereference(data) - .await?; + let instance = DbInstance::read(original_article.instance_id, &data.db_connection)?; UpdateRemoteArticle::send(edit, instance, data).await?; } Ok(()) diff --git a/src/federation/activities/reject.rs b/src/federation/activities/reject.rs index 43cc044..5327a97 100644 --- a/src/federation/activities/reject.rs +++ b/src/federation/activities/reject.rs @@ -1,7 +1,7 @@ +use crate::database::instance::DbInstance; use crate::database::MyDataHandle; use crate::error::MyResult; use crate::federation::objects::edit::ApubEdit; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_activity_id; use activitypub_federation::kinds::activity::RejectType; use activitypub_federation::{ @@ -32,7 +32,7 @@ impl RejectEdit { user_instance: DbInstance, data: &Data, ) -> MyResult<()> { - let local_instance = data.local_instance(); + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let id = generate_activity_id(local_instance.ap_id.inner())?; let reject = RejectEdit { actor: local_instance.ap_id.clone(), @@ -42,7 +42,7 @@ impl RejectEdit { id, }; local_instance - .send(reject, vec![user_instance.inbox], data) + .send(reject, vec![Url::parse(&user_instance.inbox_url)?], data) .await?; Ok(()) } diff --git a/src/federation/activities/update_local_article.rs b/src/federation/activities/update_local_article.rs index 5e3ee58..70f67e4 100644 --- a/src/federation/activities/update_local_article.rs +++ b/src/federation/activities/update_local_article.rs @@ -2,7 +2,7 @@ use crate::database::{article::DbArticle, MyDataHandle}; use crate::error::MyResult; use crate::federation::objects::article::ApubArticle; -use crate::federation::objects::instance::DbInstance; +use crate::database::instance::DbInstance; use crate::utils::generate_activity_id; use activitypub_federation::kinds::activity::UpdateType; use activitypub_federation::{ @@ -35,9 +35,9 @@ impl UpdateLocalArticle { data: &Data, ) -> MyResult<()> { debug_assert!(article.local); - let local_instance = data.local_instance(); + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let id = generate_activity_id(local_instance.ap_id.inner())?; - let mut to = local_instance.follower_ids(); + let mut to = local_instance.follower_ids(&data)?; to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone())); let update = UpdateLocalArticle { actor: local_instance.ap_id.clone(), diff --git a/src/federation/activities/update_remote_article.rs b/src/federation/activities/update_remote_article.rs index 683d7be..90529a7 100644 --- a/src/federation/activities/update_remote_article.rs +++ b/src/federation/activities/update_remote_article.rs @@ -3,10 +3,10 @@ use crate::error::MyResult; use crate::database::article::DbArticle; use crate::database::edit::DbEdit; +use crate::database::instance::DbInstance; use crate::federation::activities::reject::RejectEdit; use crate::federation::activities::update_local_article::UpdateLocalArticle; use crate::federation::objects::edit::ApubEdit; -use crate::federation::objects::instance::DbInstance; use crate::utils::generate_activity_id; use activitypub_federation::kinds::activity::UpdateType; use activitypub_federation::{ @@ -38,7 +38,7 @@ impl UpdateRemoteArticle { article_instance: DbInstance, data: &Data, ) -> MyResult<()> { - let local_instance = data.local_instance(); + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let id = generate_activity_id(local_instance.ap_id.inner())?; let update = UpdateRemoteArticle { actor: local_instance.ap_id.clone(), @@ -51,7 +51,7 @@ impl UpdateRemoteArticle { // or put previous_version in DbEdit dbg!(&update.object.previous_version); local_instance - .send(update, vec![article_instance.inbox], data) + .send(update, vec![Url::parse(&article_instance.inbox_url)?], data) .await?; Ok(()) } diff --git a/src/federation/mod.rs b/src/federation/mod.rs index 6b73d00..67e6c0f 100644 --- a/src/federation/mod.rs +++ b/src/federation/mod.rs @@ -1,39 +1,3 @@ -use crate::database::FakeDatabase; -use crate::error::Error; -use crate::federation::objects::instance::DbInstance; -use activitypub_federation::fetch::collection_id::CollectionId; -use activitypub_federation::http_signatures::generate_actor_keypair; -use chrono::Local; -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use url::Url; - pub mod activities; pub mod objects; pub mod routes; - -pub async fn create_fake_db(hostname: &str) -> Result, Error> { - let ap_id = Url::parse(&format!("http://{}", hostname))?; - let articles_id = CollectionId::parse(&format!("http://{}/all_articles", hostname))?; - let inbox = Url::parse(&format!("http://{}/inbox", hostname))?; - let keypair = generate_actor_keypair()?; - let local_instance = DbInstance { - ap_id: ap_id.into(), - articles_id, - inbox, - public_key: keypair.public_key, - private_key: Some(keypair.private_key), - last_refreshed_at: Local::now().into(), - followers: vec![], - follows: vec![], - local: true, - }; - let fake_db = Arc::new(FakeDatabase { - instances: Mutex::new(HashMap::from([( - local_instance.ap_id.inner().clone(), - local_instance, - )])), - conflicts: Mutex::new(vec![]), - }); - Ok(fake_db) -} diff --git a/src/federation/objects/article.rs b/src/federation/objects/article.rs index 2073ea5..9cec8ca 100644 --- a/src/federation/objects/article.rs +++ b/src/federation/objects/article.rs @@ -1,9 +1,9 @@ use crate::database::article::DbArticleForm; use crate::database::edit::EditVersion; +use crate::database::instance::DbInstance; use crate::database::{article::DbArticle, MyDataHandle}; use crate::error::Error; use crate::federation::objects::edits_collection::DbEditCollection; -use crate::federation::objects::instance::DbInstance; use activitypub_federation::config::Data; use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::kinds::object::ArticleType; @@ -45,12 +45,12 @@ impl Object for DbArticle { } async fn into_json(self, data: &Data) -> Result { - let instance: DbInstance = self.instance_id.clone().dereference_local(data).await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; Ok(ApubArticle { kind: Default::default(), id: self.ap_id.clone(), - attributed_to: instance.ap_id.clone(), - to: vec![public(), instance.followers_url()?], + attributed_to: local_instance.ap_id.clone(), + to: vec![public(), local_instance.followers_url()?], edits: self.edits_id()?, latest_version: self.latest_edit_version(&data.db_connection)?, content: self.text, @@ -68,12 +68,13 @@ impl Object for DbArticle { } async fn from_json(json: Self::Kind, data: &Data) -> Result { + let instance = json.attributed_to.dereference(data).await?; let form = DbArticleForm { title: json.name, text: json.content, ap_id: json.id, local: false, - instance_id: json.attributed_to, + instance_id: instance.id, }; let article = DbArticle::create(&form, &data.db_connection)?; diff --git a/src/federation/objects/articles_collection.rs b/src/federation/objects/articles_collection.rs index 06d0d9a..d4efeb8 100644 --- a/src/federation/objects/articles_collection.rs +++ b/src/federation/objects/articles_collection.rs @@ -1,7 +1,7 @@ +use crate::database::instance::DbInstance; use crate::database::{article::DbArticle, MyDataHandle}; use crate::error::Error; use crate::federation::objects::article::ApubArticle; -use crate::federation::objects::instance::DbInstance; use activitypub_federation::kinds::collection::CollectionType; use activitypub_federation::{ @@ -33,7 +33,7 @@ impl Collection for DbArticleCollection { type Error = Error; async fn read_local( - _owner: &Self::Owner, + owner: &Self::Owner, data: &Data, ) -> Result { let local_articles = DbArticle::read_all_local(&data.db_connection)?; @@ -46,7 +46,7 @@ impl Collection for DbArticleCollection { .await?; let collection = ArticleCollection { r#type: Default::default(), - id: data.local_instance().articles_id.into(), + id: owner.articles_url.clone().into(), total_items: articles.len() as i32, items: articles, }; diff --git a/src/federation/objects/edits_collection.rs b/src/federation/objects/edits_collection.rs index 29b32dd..02a007f 100644 --- a/src/federation/objects/edits_collection.rs +++ b/src/federation/objects/edits_collection.rs @@ -1,9 +1,10 @@ -use crate::database::article::DbArticle; +use crate::database::article::{ArticleView, DbArticle}; use crate::database::MyDataHandle; use crate::error::Error; use crate::federation::objects::edit::ApubEdit; use crate::database::edit::DbEdit; +use crate::database::instance::DbInstance; use activitypub_federation::kinds::collection::OrderedCollectionType; use activitypub_federation::{ config::Data, @@ -37,19 +38,20 @@ impl Collection for DbEditCollection { owner: &Self::Owner, data: &Data, ) -> Result { - let edits = DbEditCollection(DbEdit::for_article(owner, &data.db_connection)?); + let article = DbArticle::read_view(owner.id, &data.db_connection)?; let edits = future::try_join_all( - edits - .0 + article + .edits .into_iter() .map(|a| a.into_json(data)) .collect::>(), ) .await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let collection = ApubEditCollection { r#type: Default::default(), - id: Url::from(data.local_instance().articles_id), + id: Url::from(local_instance.articles_url), total_items: edits.len() as i32, items: edits, }; diff --git a/src/federation/objects/instance.rs b/src/federation/objects/instance.rs index 16b5a65..e4592d1 100644 --- a/src/federation/objects/instance.rs +++ b/src/federation/objects/instance.rs @@ -1,36 +1,19 @@ -use crate::error::{Error, MyResult}; +use crate::database::instance::{DbInstance, DbInstanceForm}; +use crate::database::MyDataHandle; +use crate::error::Error; use crate::federation::objects::articles_collection::DbArticleCollection; -use crate::{database::MyDataHandle, federation::activities::follow::Follow}; -use activitypub_federation::activity_sending::SendActivityTask; use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::kinds::actor::ServiceType; use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, - protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match}, + protocol::{public_key::PublicKey, verification::verify_domains_match}, traits::{ActivityHandler, Actor, Object}, }; use chrono::{DateTime, Local, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use tracing::warn; -use url::Url; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DbInstance { - pub ap_id: ObjectId, - pub articles_id: CollectionId, - pub inbox: Url, - #[serde(skip)] - pub(crate) public_key: String, - #[serde(skip)] - pub(crate) private_key: Option, - #[serde(skip)] - pub(crate) last_refreshed_at: DateTime, - pub followers: Vec, - pub follows: Vec, - pub local: bool, -} +use url::{ParseError, Url}; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -43,68 +26,6 @@ pub struct ApubInstance { public_key: PublicKey, } -impl DbInstance { - pub fn followers_url(&self) -> MyResult { - Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?) - } - - pub fn follower_ids(&self) -> Vec { - self.followers - .iter() - .map(|f| f.ap_id.inner().clone()) - .collect() - } - - pub async fn follow(&self, other: &DbInstance, data: &Data) -> Result<(), Error> { - let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone())?; - self.send(follow, vec![other.shared_inbox_or_inbox()], data) - .await?; - Ok(()) - } - - pub async fn send_to_followers( - &self, - activity: Activity, - extra_recipients: Vec, - data: &Data, - ) -> Result<(), ::Error> - where - Activity: ActivityHandler + Serialize + Debug + Send + Sync, - ::Error: From, - { - let local_instance = data.local_instance(); - let mut inboxes: Vec<_> = local_instance - .followers - .iter() - .map(|f| f.inbox.clone()) - .collect(); - inboxes.extend(extra_recipients.into_iter().map(|i| i.inbox)); - local_instance.send(activity, inboxes, data).await?; - Ok(()) - } - - pub async fn send( - &self, - activity: Activity, - recipients: Vec, - data: &Data, - ) -> Result<(), ::Error> - where - Activity: ActivityHandler + Serialize + Debug + Send + Sync, - ::Error: From, - { - let activity = WithContext::new_default(activity); - let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; - for send in sends { - let send = send.sign_and_send(data).await; - if let Err(e) = send { - warn!("Failed to send activity {:?}: {e}", activity); - } - } - Ok(()) - } -} - #[async_trait::async_trait] impl Object for DbInstance { type DataType = MyDataHandle; @@ -119,21 +40,15 @@ impl Object for DbInstance { object_id: Url, data: &Data, ) -> Result, Self::Error> { - let users = data.instances.lock().unwrap(); - let res = users - .clone() - .into_iter() - .map(|u| u.1) - .find(|u| u.ap_id.inner() == &object_id); - Ok(res) + Ok(DbInstance::read_from_ap_id(&object_id.into(), &data).ok()) } async fn into_json(self, _data: &Data) -> Result { Ok(ApubInstance { kind: Default::default(), id: self.ap_id.clone(), - articles: self.articles_id.clone(), - inbox: self.inbox.clone(), + articles: self.articles_url.clone(), + inbox: Url::parse(&self.inbox_url)?, public_key: self.public_key(), }) } @@ -148,21 +63,18 @@ impl Object for DbInstance { } async fn from_json(json: Self::Kind, data: &Data) -> Result { - let instance = DbInstance { + let form = DbInstanceForm { ap_id: json.id, - articles_id: json.articles, - inbox: json.inbox, + articles_url: json.articles, + inbox_url: json.inbox.to_string(), public_key: json.public_key.public_key_pem, private_key: None, last_refreshed_at: Local::now().into(), - followers: vec![], - follows: vec![], local: false, }; + let instance = DbInstance::create(&form, &data.db_connection)?; // TODO: very inefficient to sync all articles every time - instance.articles_id.dereference(&instance, data).await?; - let mut mutex = data.instances.lock().unwrap(); - mutex.insert(instance.ap_id.inner().clone(), instance.clone()); + instance.articles_url.dereference(&instance, data).await?; Ok(instance) } } @@ -181,6 +93,6 @@ impl Actor for DbInstance { } fn inbox(&self) -> Url { - self.inbox.clone() + Url::parse(&self.inbox_url).unwrap() } } diff --git a/src/federation/routes.rs b/src/federation/routes.rs index c5299f3..2666e01 100644 --- a/src/federation/routes.rs +++ b/src/federation/routes.rs @@ -1,9 +1,17 @@ +use crate::database::article::DbArticle; +use crate::database::instance::DbInstance; use crate::database::MyDataHandle; use crate::error::MyResult; use crate::federation::activities::accept::Accept; +use crate::federation::activities::create_article::CreateArticle; use crate::federation::activities::follow::Follow; -use crate::federation::objects::instance::{ApubInstance, DbInstance}; - +use crate::federation::activities::reject::RejectEdit; +use crate::federation::activities::update_local_article::UpdateLocalArticle; +use crate::federation::activities::update_remote_article::UpdateRemoteArticle; +use crate::federation::objects::article::ApubArticle; +use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection}; +use crate::federation::objects::edits_collection::{ApubEditCollection, DbEditCollection}; +use crate::federation::objects::instance::ApubInstance; use activitypub_federation::axum::inbox::{receive_activity, ActivityData}; use activitypub_federation::axum::json::FederationJson; use activitypub_federation::config::Data; @@ -11,15 +19,6 @@ use activitypub_federation::protocol::context::WithContext; use activitypub_federation::traits::Object; use activitypub_federation::traits::{ActivityHandler, Collection}; use axum::extract::Path; - -use crate::database::article::DbArticle; -use crate::federation::activities::create_article::CreateArticle; -use crate::federation::activities::reject::RejectEdit; -use crate::federation::activities::update_local_article::UpdateLocalArticle; -use crate::federation::activities::update_remote_article::UpdateRemoteArticle; -use crate::federation::objects::article::ApubArticle; -use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection}; -use crate::federation::objects::edits_collection::{ApubEditCollection, DbEditCollection}; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::Router; @@ -40,8 +39,8 @@ pub fn federation_routes() -> Router { async fn http_get_instance( data: Data, ) -> MyResult>> { - let db_instance = data.local_instance(); - let json_instance = db_instance.into_json(&data).await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + let json_instance = local_instance.into_json(&data).await?; Ok(FederationJson(WithContext::new_default(json_instance))) } @@ -49,7 +48,8 @@ async fn http_get_instance( async fn http_get_all_articles( data: Data, ) -> MyResult>> { - let collection = DbArticleCollection::read_local(&data.local_instance(), &data).await?; + let local_instance = DbInstance::read_local_instance(&data.db_connection)?; + let collection = DbArticleCollection::read_local(&local_instance, &data).await?; Ok(FederationJson(WithContext::new_default(collection))) } diff --git a/src/lib.rs b/src/lib.rs index bb80116..72e292a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,24 @@ use crate::api::api_routes; -use crate::database::MyData; +use crate::database::instance::{DbInstance, DbInstanceForm}; +use crate::database::{FakeDatabase, MyData}; use crate::error::MyResult; use crate::federation::routes::federation_routes; use crate::utils::generate_activity_id; use activitypub_federation::config::{FederationConfig, FederationMiddleware}; +use activitypub_federation::fetch::collection_id::CollectionId; +use activitypub_federation::fetch::object_id::ObjectId; +use activitypub_federation::http_signatures::generate_actor_keypair; use axum::{Router, Server}; +use chrono::Local; use diesel::Connection; use diesel::PgConnection; use diesel_migrations::embed_migrations; use diesel_migrations::EmbeddedMigrations; use diesel_migrations::MigrationHarness; -use federation::create_fake_db; use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex}; use tracing::info; +use url::Url; pub mod api; pub mod database; @@ -24,7 +29,9 @@ mod utils; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); pub async fn start(hostname: &str, database_url: &str) -> MyResult<()> { - let fake_db = create_fake_db(hostname).await?; + let fake_db = Arc::new(FakeDatabase { + conflicts: Mutex::new(vec![]), + }); let db_connection = Arc::new(Mutex::new(PgConnection::establish(database_url)?)); db_connection @@ -44,6 +51,22 @@ pub async fn start(hostname: &str, database_url: &str) -> MyResult<()> { .build() .await?; + // TODO: Move this into setup api call + let ap_id = ObjectId::parse(&format!("http://{}", hostname))?; + let articles_url = CollectionId::parse(&format!("http://{}/all_articles", hostname))?; + let inbox_url = format!("http://{}/inbox", hostname); + let keypair = generate_actor_keypair()?; + let form = DbInstanceForm { + ap_id, + articles_url, + inbox_url, + public_key: keypair.public_key, + private_key: Some(keypair.private_key), + last_refreshed_at: Local::now().into(), + local: true, + }; + DbInstance::create(&form, &config.db_connection)?; + info!("Listening with axum on {hostname}"); let config = config.clone(); let app = Router::new() diff --git a/tests/common.rs b/tests/common.rs index 1cb3848..ab306a1 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -2,8 +2,8 @@ use fediwiki::api::{ ApiConflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject, }; use fediwiki::database::article::ArticleView; +use fediwiki::database::instance::DbInstance; use fediwiki::error::MyResult; -use fediwiki::federation::objects::instance::DbInstance; use fediwiki::start; use once_cell::sync::Lazy; use reqwest::Client; @@ -221,7 +221,7 @@ pub async fn follow_instance(follow_instance: &str, followed_instance: &str) -> // send follow let follow_form = FollowInstance { - instance_id: instance_resolved.ap_id, + id: instance_resolved.id, }; // cant use post helper because follow doesnt return json CLIENT diff --git a/tests/test.rs b/tests/test.rs index 3204724..ead95d2 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -13,10 +13,11 @@ use fediwiki::api::{ use fediwiki::database::article::{ArticleView, DbArticle}; use fediwiki::error::MyResult; -use fediwiki::federation::objects::instance::DbInstance; +use fediwiki::database::instance::{DbInstance, InstanceView}; use pretty_assertions::{assert_eq, assert_ne}; use url::Url; +// TODO: can run tests in parallel if we use different ports #[tokio::test] async fn test_create_read_and_edit_article() -> MyResult<()> { let data = TestData::start(); @@ -80,19 +81,21 @@ async fn test_follow_instance() -> MyResult<()> { let data = TestData::start(); // check initial state - let alpha_instance: DbInstance = get(&data.alpha.hostname, "instance").await?; - assert_eq!(0, alpha_instance.follows.len()); - let beta_instance: DbInstance = get(&data.beta.hostname, "instance").await?; + let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?; + assert_eq!(0, alpha_instance.followers.len()); + assert_eq!(0, alpha_instance.followed.len()); + let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; assert_eq!(0, beta_instance.followers.len()); + assert_eq!(0, beta_instance.followed.len()); follow_instance(&data.alpha.hostname, &data.beta.hostname).await?; // check that follow was federated - let beta_instance: DbInstance = get(&data.beta.hostname, "instance").await?; + let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; assert_eq!(1, beta_instance.followers.len()); - let alpha_instance: DbInstance = get(&data.alpha.hostname, "instance").await?; - assert_eq!(1, alpha_instance.follows.len()); + let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?; + assert_eq!(1, alpha_instance.followed.len()); data.stop() } @@ -154,6 +157,7 @@ async fn test_edit_local_article() -> MyResult<()> { let create_res = create_article(&data.beta.hostname, title.clone()).await?; assert_eq!(title, create_res.article.title); assert!(create_res.article.local); + dbg!(1); // article should be federated to alpha let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?; @@ -161,6 +165,7 @@ async fn test_edit_local_article() -> MyResult<()> { assert_eq!(1, get_res.edits.len()); assert!(!get_res.article.local); assert_eq!(create_res.article.text, get_res.article.text); + dbg!(2); // edit the article let edit_form = EditArticleData { @@ -170,6 +175,7 @@ async fn test_edit_local_article() -> MyResult<()> { resolve_conflict_id: None, }; let edit_res = edit_article(&data.beta.hostname, &edit_form).await?; + dbg!(3); assert_eq!(edit_res.article.text, edit_form.new_text); assert_eq!(edit_res.edits.len(), 2); assert!(edit_res.edits[0] @@ -179,6 +185,7 @@ async fn test_edit_local_article() -> MyResult<()> { // edit should be federated to alpha let get_res = get_article(&data.alpha.hostname, edit_res.article.id).await?; + dbg!(4); assert_eq!(edit_res.article.title, get_res.article.title); assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.article.text, get_res.article.text); @@ -446,8 +453,8 @@ async fn test_fork_article() -> MyResult<()> { assert_ne!(resolved_article.ap_id, forked_article.ap_id); assert!(forked_article.local); - let beta_instance: DbInstance = get(&data.beta.hostname, "instance").await?; - assert_eq!(forked_article.instance_id, beta_instance.ap_id); + let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; + assert_eq!(forked_article.instance_id, beta_instance.instance.id); // now search returns two articles for this title (original and forked) let search_form = SearchArticleData {