From a5da3de8c2d58b05bfe634cdfea6bfd82e7f76b5 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 16 Nov 2023 12:48:57 +0100 Subject: [PATCH] finish implementing instance follow --- Cargo.lock | 69 +++++++++++++++++++++++++ Cargo.toml | 4 +- src/api.rs | 61 +++++++++++++++++++--- src/database.rs | 18 ++----- src/federation/activities/accept.rs | 6 ++- src/federation/mod.rs | 19 ++++++- src/federation/objects/instance.rs | 52 +++++-------------- src/federation/objects/person.rs | 16 +----- src/federation/routes.rs | 39 +++++++++----- src/lib.rs | 9 +--- src/main.rs | 6 +++ tests/test.rs | 80 ++++++++++++++++++++++++++++- 12 files changed, 283 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3179aaa..e0d95f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,6 +260,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -334,6 +335,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.2", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derive_builder" version = "0.12.0" @@ -463,10 +477,12 @@ dependencies = [ "chrono", "enum_delegate", "env_logger", + "once_cell", "rand", "reqwest", "serde", "serde_json", + "serial_test", "tokio", "tracing", "url", @@ -502,6 +518,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.29" @@ -509,6 +540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -517,6 +549,17 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-executor" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.29" @@ -552,6 +595,7 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1358,6 +1402,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index ab40239..6338503 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1.0.75" async-trait = "0.1.74" axum = "0.6.20" axum-macros = "0.3.8" -chrono = "0.4.31" +chrono = { version = "0.4.31", features = ["serde"] } enum_delegate = "0.2.0" env_logger = { version = "0.10.1", default-features = false } rand = "0.8.5" @@ -20,4 +20,6 @@ tracing = "0.1.40" url = "2.4.1" [dev-dependencies] +once_cell = "1.18.0" reqwest = "0.11.22" +serial_test = "2.0.0" diff --git a/src/api.rs b/src/api.rs index e7e58df..de8fed8 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,18 +1,67 @@ +use crate::database::DatabaseHandle; use crate::error::MyResult; use crate::federation::objects::article::DbArticle; use crate::federation::objects::instance::DbInstance; -use axum::extract::Path; -use axum::routing::get; -use axum::{Json, Router}; +use activitypub_federation::config::Data; +use activitypub_federation::fetch::object_id::ObjectId; +use axum::extract::{Path, Query}; +use axum::routing::{get, post}; +use axum::{Form, Json, Router}; use axum_macros::debug_handler; +use serde::{Deserialize, Serialize}; + +use url::Url; pub fn api_routes() -> Router { - Router::new().route("/article/:title", get(api_get_article)) + Router::new() + .route("/article/:title", get(get_article)) + .route("/resolve_object", get(resolve_object)) + .route("/instance", get(get_local_instance)) + .route("/instance/follow", post(follow_instance)) } #[debug_handler] -async fn api_get_article(Path(title): Path) -> MyResult> { - let instance = DbInstance::new("localhost")?; +async fn get_article( + Path(title): Path, + data: Data, +) -> MyResult> { + let instance = data.local_instance(); let article = DbArticle::new(title, "dummy".to_string(), instance.ap_id)?; Ok(Json(article)) } + +#[derive(Deserialize, Serialize)] +pub struct ResolveObject { + pub id: Url, +} + +#[debug_handler] +async fn resolve_object( + Query(query): Query, + data: Data, +) -> MyResult> { + let instance: DbInstance = ObjectId::from(query.id).dereference(&data).await?; + let mut instances = data.instances.lock().unwrap(); + instances.push(instance.clone()); + Ok(Json(instance)) +} + +#[debug_handler] +async fn get_local_instance(data: Data) -> MyResult> { + Ok(Json(data.local_instance())) +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct FollowInstance { + pub instance_id: ObjectId, +} + +#[debug_handler] +async fn follow_instance( + data: Data, + Form(query): Form, +) -> MyResult<()> { + let instance = query.instance_id.dereference(&data).await?; + data.local_instance().follow(&instance, &data).await?; + Ok(()) +} diff --git a/src/database.rs b/src/database.rs index 9bbc807..8a522db 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,7 +1,6 @@ -use crate::error::Error; use crate::federation::objects::instance::DbInstance; use crate::federation::objects::{article::DbArticle, person::DbUser}; -use anyhow::anyhow; + use std::sync::{Arc, Mutex}; pub type DatabaseHandle = Arc; @@ -14,17 +13,8 @@ pub struct Database { } impl Database { - pub fn local_user(&self) -> DbUser { - let lock = self.users.lock().unwrap(); - lock.first().unwrap().clone() - } - - pub fn read_user(&self, name: &str) -> Result { - let db_user = self.local_user(); - if name == db_user.name { - Ok(db_user) - } else { - Err(anyhow!("Invalid user {name}").into()) - } + pub fn local_instance(&self) -> DbInstance { + let lock = self.instances.lock().unwrap(); + lock.iter().find(|i| i.local).unwrap().clone() } } diff --git a/src/federation/activities/accept.rs b/src/federation/activities/accept.rs index 9dc6b11..894364b 100644 --- a/src/federation/activities/accept.rs +++ b/src/federation/activities/accept.rs @@ -47,7 +47,11 @@ impl ActivityHandler for Accept { Ok(()) } - async fn receive(self, _data: &Data) -> Result<(), Self::Error> { + async fn receive(self, data: &Data) -> Result<(), Self::Error> { + // add to follows + let mut instances = data.instances.lock().unwrap(); + let local_instance = instances.first_mut().unwrap(); + local_instance.follows.push(self.actor.inner().clone()); Ok(()) } } diff --git a/src/federation/mod.rs b/src/federation/mod.rs index 0033d5e..2dc6805 100644 --- a/src/federation/mod.rs +++ b/src/federation/mod.rs @@ -1,7 +1,11 @@ use crate::database::{Database, DatabaseHandle}; use crate::error::Error; +use crate::federation::objects::instance::DbInstance; + use activitypub_federation::config::{FederationConfig, UrlVerifier}; +use activitypub_federation::http_signatures::generate_actor_keypair; use async_trait::async_trait; +use chrono::Local; use std::sync::{Arc, Mutex}; use url::Url; @@ -10,8 +14,21 @@ pub mod objects; pub mod routes; pub async fn federation_config(hostname: &str) -> Result, Error> { + let ap_id = Url::parse(&format!("http://{}", hostname))?.into(); + let inbox = Url::parse(&format!("http://{}/inbox", hostname))?; + let keypair = generate_actor_keypair()?; + let local_instance = DbInstance { + ap_id, + inbox, + public_key: keypair.public_key, + private_key: Some(keypair.private_key), + last_refreshed_at: Local::now().naive_local(), + followers: vec![], + follows: vec![], + local: true, + }; let database = Arc::new(Database { - instances: Mutex::new(vec![]), + instances: Mutex::new(vec![local_instance]), users: Mutex::new(vec![]), posts: Mutex::new(vec![]), }); diff --git a/src/federation/objects/instance.rs b/src/federation/objects/instance.rs index 1cf90e6..cf30a38 100644 --- a/src/federation/objects/instance.rs +++ b/src/federation/objects/instance.rs @@ -1,14 +1,10 @@ use crate::error::Error; -use crate::{ - database::DatabaseHandle, - federation::activities::{accept::Accept, follow::Follow}, -}; +use crate::{database::DatabaseHandle, federation::activities::follow::Follow}; use activitypub_federation::kinds::actor::ServiceType; use activitypub_federation::{ activity_queue::send_activity, config::Data, - fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, - http_signatures::generate_actor_keypair, + fetch::object_id::ObjectId, protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match}, traits::{ActivityHandler, Actor, Object}, }; @@ -17,43 +13,18 @@ use serde::{Deserialize, Serialize}; use std::fmt::Debug; use url::Url; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DbInstance { pub ap_id: ObjectId, pub inbox: Url, - public_key: String, - private_key: Option, - last_refreshed_at: NaiveDateTime, + pub(crate) public_key: String, + pub(crate) private_key: Option, + pub(crate) last_refreshed_at: NaiveDateTime, pub followers: Vec, + pub follows: Vec, pub local: bool, } -/// List of all activities which this actor can receive. -#[derive(Deserialize, Serialize, Debug)] -#[serde(untagged)] -#[enum_delegate::implement(ActivityHandler)] -pub enum PersonAcceptedActivities { - Follow(Follow), - Accept(Accept), -} - -impl DbInstance { - pub fn new(hostname: &str) -> Result { - let ap_id = Url::parse(&format!("http://{}", hostname))?.into(); - let inbox = Url::parse(&format!("http://{}/inbox", hostname))?; - let keypair = generate_actor_keypair()?; - Ok(DbInstance { - ap_id, - inbox, - public_key: keypair.public_key, - private_key: Some(keypair.private_key), - last_refreshed_at: Local::now().naive_local(), - followers: vec![], - local: true, - }) - } -} - #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct Instance { @@ -73,9 +44,13 @@ impl DbInstance { Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?) } - pub async fn follow(&self, other: &str, data: &Data) -> Result<(), Error> { - let other: DbInstance = webfinger_resolve_actor(other, data).await?; + pub async fn follow( + &self, + other: &DbInstance, + data: &Data, + ) -> Result<(), Error> { let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone())?; + dbg!(&follow); self.send(follow, vec![other.shared_inbox_or_inbox()], data) .await?; Ok(()) @@ -145,6 +120,7 @@ impl Object for DbInstance { private_key: None, last_refreshed_at: Local::now().naive_local(), followers: vec![], + follows: vec![], local: false, }; let mut mutex = data.instances.lock().unwrap(); diff --git a/src/federation/objects/person.rs b/src/federation/objects/person.rs index aa22997..7a912d7 100644 --- a/src/federation/objects/person.rs +++ b/src/federation/objects/person.rs @@ -1,15 +1,12 @@ +use crate::database::DatabaseHandle; use crate::error::Error; -use crate::{ - database::DatabaseHandle, - federation::activities::{accept::Accept, follow::Follow}, -}; use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, http_signatures::generate_actor_keypair, kinds::actor::PersonType, protocol::{public_key::PublicKey, verification::verify_domains_match}, - traits::{ActivityHandler, Actor, Object}, + traits::{Actor, Object}, }; use chrono::{Local, NaiveDateTime}; use serde::{Deserialize, Serialize}; @@ -28,15 +25,6 @@ pub struct DbUser { pub local: bool, } -/// List of all activities which this actor can receive. -#[derive(Deserialize, Serialize, Debug)] -#[serde(untagged)] -#[enum_delegate::implement(ActivityHandler)] -pub enum PersonAcceptedActivities { - Follow(Follow), - Accept(Accept), -} - impl DbUser { pub fn new(hostname: &str, name: String) -> Result { let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into(); diff --git a/src/federation/routes.rs b/src/federation/routes.rs index 400fd49..77b120d 100644 --- a/src/federation/routes.rs +++ b/src/federation/routes.rs @@ -1,39 +1,54 @@ use crate::database::DatabaseHandle; use crate::error::MyResult; -use crate::federation::objects::person::{DbUser, Person, PersonAcceptedActivities}; +use crate::federation::activities::accept::Accept; +use crate::federation::activities::follow::Follow; +use crate::federation::objects::instance::{DbInstance, Instance}; + use activitypub_federation::axum::inbox::{receive_activity, ActivityData}; use activitypub_federation::axum::json::FederationJson; use activitypub_federation::config::Data; use activitypub_federation::protocol::context::WithContext; +use activitypub_federation::traits::ActivityHandler; use activitypub_federation::traits::Object; -use axum::extract::path::Path; + use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::Router; use axum_macros::debug_handler; +use serde::{Deserialize, Serialize}; +use url::Url; pub fn federation_routes() -> Router { Router::new() - .route("/:user/inbox", post(http_post_user_inbox)) - .route("/:user", get(http_get_user)) + .route("/", get(http_get_instance)) + .route("/inbox", post(http_post_inbox)) } #[debug_handler] -async fn http_get_user( - Path(name): Path, +async fn http_get_instance( data: Data, -) -> MyResult>> { - let db_user = data.read_user(&name)?; - let json_user = db_user.into_json(&data).await?; - Ok(FederationJson(WithContext::new_default(json_user))) +) -> MyResult>> { + let db_instance = data.local_instance(); + let json_instance = db_instance.into_json(&data).await?; + Ok(FederationJson(WithContext::new_default(json_instance))) +} + +/// List of all activities which this actor can receive. +#[derive(Deserialize, Serialize, Debug)] +#[serde(untagged)] +#[enum_delegate::implement(ActivityHandler)] +pub enum InboxActivities { + Follow(Follow), + Accept(Accept), } #[debug_handler] -pub async fn http_post_user_inbox( +pub async fn http_post_inbox( data: Data, activity_data: ActivityData, ) -> impl IntoResponse { - receive_activity::, DbUser, DatabaseHandle>( + dbg!("receive activity"); + receive_activity::, DbInstance, DatabaseHandle>( activity_data, &data, ) diff --git a/src/lib.rs b/src/lib.rs index b8fae9b..c7f29f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ use crate::utils::generate_object_id; -use tracing::log::LevelFilter; use activitypub_federation::config::FederationMiddleware; use axum::{Router, Server}; @@ -11,19 +10,13 @@ use federation::federation_config; use std::net::ToSocketAddrs; use tracing::info; -mod api; +pub mod api; mod database; pub mod error; pub mod federation; mod utils; pub async fn start(hostname: &str) -> MyResult<()> { - env_logger::builder() - .filter_level(LevelFilter::Warn) - .filter_module("activitypub_federation", LevelFilter::Info) - .filter_module("fediwiki", LevelFilter::Info) - .init(); - let config = federation_config(hostname).await?; info!("Listening with axum on {hostname}"); diff --git a/src/main.rs b/src/main.rs index 782e3b4..3f81a0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,14 @@ use fediwiki::error::MyResult; use fediwiki::start; +use tracing::log::LevelFilter; #[tokio::main] pub async fn main() -> MyResult<()> { + env_logger::builder() + .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) + .filter_module("fediwiki", LevelFilter::Info) + .init(); start("localhost:8131").await?; Ok(()) } diff --git a/tests/test.rs b/tests/test.rs index 6f07315..d3c5c3e 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,11 +1,34 @@ extern crate fediwiki; +use fediwiki::api::{FollowInstance, ResolveObject}; use fediwiki::error::MyResult; use fediwiki::federation::objects::article::DbArticle; +use fediwiki::federation::objects::instance::DbInstance; use fediwiki::start; +use once_cell::sync::Lazy; +use reqwest::Client; +use serial_test::serial; +use std::sync::Once; +use tracing::log::LevelFilter; +use url::Url; + +fn setup() { + static INIT: Once = Once::new(); + INIT.call_once(|| { + env_logger::builder() + .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) + .filter_module("fediwiki", LevelFilter::Info) + .init(); + }); +} + +static CLIENT: Lazy = Lazy::new(|| Client::new()); #[tokio::test] +#[serial] async fn test_get_article() -> MyResult<()> { + setup(); let hostname = "localhost:8131"; let handle = tokio::task::spawn(async { start(hostname).await.unwrap(); @@ -23,7 +46,9 @@ async fn test_get_article() -> MyResult<()> { } #[tokio::test] +#[serial] async fn test_follow_instance() -> MyResult<()> { + setup(); let hostname_alpha = "localhost:8131"; let hostname_beta = "localhost:8132"; let handle_alpha = tokio::task::spawn(async { @@ -33,7 +58,60 @@ async fn test_follow_instance() -> MyResult<()> { start(hostname_beta).await.unwrap(); }); - // TODO + // check initial state + let alpha_instance: DbInstance = CLIENT + .get(format!("http://{hostname_alpha}/api/v1/instance")) + .send() + .await? + .json() + .await?; + assert_eq!(0, alpha_instance.follows.len()); + let beta_instance: DbInstance = CLIENT + .get(format!("http://{hostname_beta}/api/v1/instance")) + .send() + .await? + .json() + .await?; + assert_eq!(0, beta_instance.followers.len()); + + // fetch beta instance on alpha + let resolve_object = ResolveObject { + id: Url::parse(&format!("http://{hostname_beta}"))?, + }; + let beta_instance_resolved: DbInstance = CLIENT + .get(format!("http://{hostname_alpha}/api/v1/resolve_object")) + .query(&resolve_object) + .send() + .await? + .json() + .await?; + + // send follow + let follow_instance = FollowInstance { + instance_id: beta_instance_resolved.ap_id, + }; + CLIENT + .post(format!("http://{hostname_alpha}/api/v1/instance/follow")) + .form(&follow_instance) + .send() + .await?; + + // check that follow was federated + let beta_instance: DbInstance = CLIENT + .get(format!("http://{hostname_beta}/api/v1/instance")) + .send() + .await? + .json() + .await?; + assert_eq!(1, beta_instance.followers.len()); + + let alpha_instance: DbInstance = CLIENT + .get(format!("http://{hostname_alpha}/api/v1/instance")) + .send() + .await? + .json() + .await?; + assert_eq!(1, alpha_instance.follows.len()); handle_alpha.abort(); handle_beta.abort();