1
0
Fork 0
mirror of https://github.com/Nutomic/ibis.git synced 2025-01-11 12:25:49 +00:00

finish implementing instance follow

This commit is contained in:
Felix Ableitner 2023-11-16 12:48:57 +01:00
parent b72bab1c6e
commit a5da3de8c2
12 changed files with 283 additions and 96 deletions

69
Cargo.lock generated
View file

@ -260,6 +260,7 @@ dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets",
]
@ -334,6 +335,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.2",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "derive_builder"
version = "0.12.0"
@ -463,10 +477,12 @@ dependencies = [
"chrono",
"enum_delegate",
"env_logger",
"once_cell",
"rand",
"reqwest",
"serde",
"serde_json",
"serial_test",
"tokio",
"tracing",
"url",
@ -502,6 +518,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.29"
@ -509,6 +540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -517,6 +549,17 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.29"
@ -552,6 +595,7 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@ -1358,6 +1402,31 @@ dependencies = [
"serde",
]
[[package]]
name = "serial_test"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d"
dependencies = [
"dashmap",
"futures",
"lazy_static",
"log",
"parking_lot",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "sha1"
version = "0.10.6"

View file

@ -9,7 +9,7 @@ anyhow = "1.0.75"
async-trait = "0.1.74"
axum = "0.6.20"
axum-macros = "0.3.8"
chrono = "0.4.31"
chrono = { version = "0.4.31", features = ["serde"] }
enum_delegate = "0.2.0"
env_logger = { version = "0.10.1", default-features = false }
rand = "0.8.5"
@ -20,4 +20,6 @@ tracing = "0.1.40"
url = "2.4.1"
[dev-dependencies]
once_cell = "1.18.0"
reqwest = "0.11.22"
serial_test = "2.0.0"

View file

@ -1,18 +1,67 @@
use crate::database::DatabaseHandle;
use crate::error::MyResult;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::instance::DbInstance;
use axum::extract::Path;
use axum::routing::get;
use axum::{Json, Router};
use activitypub_federation::config::Data;
use activitypub_federation::fetch::object_id::ObjectId;
use axum::extract::{Path, Query};
use axum::routing::{get, post};
use axum::{Form, Json, Router};
use axum_macros::debug_handler;
use serde::{Deserialize, Serialize};
use url::Url;
pub fn api_routes() -> Router {
Router::new().route("/article/:title", get(api_get_article))
Router::new()
.route("/article/:title", get(get_article))
.route("/resolve_object", get(resolve_object))
.route("/instance", get(get_local_instance))
.route("/instance/follow", post(follow_instance))
}
#[debug_handler]
async fn api_get_article(Path(title): Path<String>) -> MyResult<Json<DbArticle>> {
let instance = DbInstance::new("localhost")?;
async fn get_article(
Path(title): Path<String>,
data: Data<DatabaseHandle>,
) -> MyResult<Json<DbArticle>> {
let instance = data.local_instance();
let article = DbArticle::new(title, "dummy".to_string(), instance.ap_id)?;
Ok(Json(article))
}
#[derive(Deserialize, Serialize)]
pub struct ResolveObject {
pub id: Url,
}
#[debug_handler]
async fn resolve_object(
Query(query): Query<ResolveObject>,
data: Data<DatabaseHandle>,
) -> MyResult<Json<DbInstance>> {
let instance: DbInstance = ObjectId::from(query.id).dereference(&data).await?;
let mut instances = data.instances.lock().unwrap();
instances.push(instance.clone());
Ok(Json(instance))
}
#[debug_handler]
async fn get_local_instance(data: Data<DatabaseHandle>) -> MyResult<Json<DbInstance>> {
Ok(Json(data.local_instance()))
}
#[derive(Deserialize, Serialize, Debug)]
pub struct FollowInstance {
pub instance_id: ObjectId<DbInstance>,
}
#[debug_handler]
async fn follow_instance(
data: Data<DatabaseHandle>,
Form(query): Form<FollowInstance>,
) -> MyResult<()> {
let instance = query.instance_id.dereference(&data).await?;
data.local_instance().follow(&instance, &data).await?;
Ok(())
}

View file

@ -1,7 +1,6 @@
use crate::error::Error;
use crate::federation::objects::instance::DbInstance;
use crate::federation::objects::{article::DbArticle, person::DbUser};
use anyhow::anyhow;
use std::sync::{Arc, Mutex};
pub type DatabaseHandle = Arc<Database>;
@ -14,17 +13,8 @@ pub struct Database {
}
impl Database {
pub fn local_user(&self) -> DbUser {
let lock = self.users.lock().unwrap();
lock.first().unwrap().clone()
}
pub fn read_user(&self, name: &str) -> Result<DbUser, Error> {
let db_user = self.local_user();
if name == db_user.name {
Ok(db_user)
} else {
Err(anyhow!("Invalid user {name}").into())
}
pub fn local_instance(&self) -> DbInstance {
let lock = self.instances.lock().unwrap();
lock.iter().find(|i| i.local).unwrap().clone()
}
}

View file

@ -47,7 +47,11 @@ impl ActivityHandler for Accept {
Ok(())
}
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// add to follows
let mut instances = data.instances.lock().unwrap();
let local_instance = instances.first_mut().unwrap();
local_instance.follows.push(self.actor.inner().clone());
Ok(())
}
}

View file

@ -1,7 +1,11 @@
use crate::database::{Database, DatabaseHandle};
use crate::error::Error;
use crate::federation::objects::instance::DbInstance;
use activitypub_federation::config::{FederationConfig, UrlVerifier};
use activitypub_federation::http_signatures::generate_actor_keypair;
use async_trait::async_trait;
use chrono::Local;
use std::sync::{Arc, Mutex};
use url::Url;
@ -10,8 +14,21 @@ pub mod objects;
pub mod routes;
pub async fn federation_config(hostname: &str) -> Result<FederationConfig<DatabaseHandle>, Error> {
let ap_id = Url::parse(&format!("http://{}", hostname))?.into();
let inbox = Url::parse(&format!("http://{}/inbox", hostname))?;
let keypair = generate_actor_keypair()?;
let local_instance = DbInstance {
ap_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
last_refreshed_at: Local::now().naive_local(),
followers: vec![],
follows: vec![],
local: true,
};
let database = Arc::new(Database {
instances: Mutex::new(vec![]),
instances: Mutex::new(vec![local_instance]),
users: Mutex::new(vec![]),
posts: Mutex::new(vec![]),
});

View file

@ -1,14 +1,10 @@
use crate::error::Error;
use crate::{
database::DatabaseHandle,
federation::activities::{accept::Accept, follow::Follow},
};
use crate::{database::DatabaseHandle, federation::activities::follow::Follow};
use activitypub_federation::kinds::actor::ServiceType;
use activitypub_federation::{
activity_queue::send_activity,
config::Data,
fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor},
http_signatures::generate_actor_keypair,
fetch::object_id::ObjectId,
protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match},
traits::{ActivityHandler, Actor, Object},
};
@ -17,43 +13,18 @@ use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use url::Url;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbInstance {
pub ap_id: ObjectId<DbInstance>,
pub inbox: Url,
public_key: String,
private_key: Option<String>,
last_refreshed_at: NaiveDateTime,
pub(crate) public_key: String,
pub(crate) private_key: Option<String>,
pub(crate) last_refreshed_at: NaiveDateTime,
pub followers: Vec<Url>,
pub follows: Vec<Url>,
pub local: bool,
}
/// List of all activities which this actor can receive.
#[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)]
#[enum_delegate::implement(ActivityHandler)]
pub enum PersonAcceptedActivities {
Follow(Follow),
Accept(Accept),
}
impl DbInstance {
pub fn new(hostname: &str) -> Result<DbInstance, Error> {
let ap_id = Url::parse(&format!("http://{}", hostname))?.into();
let inbox = Url::parse(&format!("http://{}/inbox", hostname))?;
let keypair = generate_actor_keypair()?;
Ok(DbInstance {
ap_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
last_refreshed_at: Local::now().naive_local(),
followers: vec![],
local: true,
})
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Instance {
@ -73,9 +44,13 @@ impl DbInstance {
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
}
pub async fn follow(&self, other: &str, data: &Data<DatabaseHandle>) -> Result<(), Error> {
let other: DbInstance = webfinger_resolve_actor(other, data).await?;
pub async fn follow(
&self,
other: &DbInstance,
data: &Data<DatabaseHandle>,
) -> Result<(), Error> {
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone())?;
dbg!(&follow);
self.send(follow, vec![other.shared_inbox_or_inbox()], data)
.await?;
Ok(())
@ -145,6 +120,7 @@ impl Object for DbInstance {
private_key: None,
last_refreshed_at: Local::now().naive_local(),
followers: vec![],
follows: vec![],
local: false,
};
let mut mutex = data.instances.lock().unwrap();

View file

@ -1,15 +1,12 @@
use crate::database::DatabaseHandle;
use crate::error::Error;
use crate::{
database::DatabaseHandle,
federation::activities::{accept::Accept, follow::Follow},
};
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
http_signatures::generate_actor_keypair,
kinds::actor::PersonType,
protocol::{public_key::PublicKey, verification::verify_domains_match},
traits::{ActivityHandler, Actor, Object},
traits::{Actor, Object},
};
use chrono::{Local, NaiveDateTime};
use serde::{Deserialize, Serialize};
@ -28,15 +25,6 @@ pub struct DbUser {
pub local: bool,
}
/// List of all activities which this actor can receive.
#[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)]
#[enum_delegate::implement(ActivityHandler)]
pub enum PersonAcceptedActivities {
Follow(Follow),
Accept(Accept),
}
impl DbUser {
pub fn new(hostname: &str, name: String) -> Result<DbUser, Error> {
let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into();

View file

@ -1,39 +1,54 @@
use crate::database::DatabaseHandle;
use crate::error::MyResult;
use crate::federation::objects::person::{DbUser, Person, PersonAcceptedActivities};
use crate::federation::activities::accept::Accept;
use crate::federation::activities::follow::Follow;
use crate::federation::objects::instance::{DbInstance, Instance};
use activitypub_federation::axum::inbox::{receive_activity, ActivityData};
use activitypub_federation::axum::json::FederationJson;
use activitypub_federation::config::Data;
use activitypub_federation::protocol::context::WithContext;
use activitypub_federation::traits::ActivityHandler;
use activitypub_federation::traits::Object;
use axum::extract::path::Path;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::Router;
use axum_macros::debug_handler;
use serde::{Deserialize, Serialize};
use url::Url;
pub fn federation_routes() -> Router {
Router::new()
.route("/:user/inbox", post(http_post_user_inbox))
.route("/:user", get(http_get_user))
.route("/", get(http_get_instance))
.route("/inbox", post(http_post_inbox))
}
#[debug_handler]
async fn http_get_user(
Path(name): Path<String>,
async fn http_get_instance(
data: Data<DatabaseHandle>,
) -> MyResult<FederationJson<WithContext<Person>>> {
let db_user = data.read_user(&name)?;
let json_user = db_user.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json_user)))
) -> MyResult<FederationJson<WithContext<Instance>>> {
let db_instance = data.local_instance();
let json_instance = db_instance.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json_instance)))
}
/// List of all activities which this actor can receive.
#[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)]
#[enum_delegate::implement(ActivityHandler)]
pub enum InboxActivities {
Follow(Follow),
Accept(Accept),
}
#[debug_handler]
pub async fn http_post_user_inbox(
pub async fn http_post_inbox(
data: Data<DatabaseHandle>,
activity_data: ActivityData,
) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
dbg!("receive activity");
receive_activity::<WithContext<InboxActivities>, DbInstance, DatabaseHandle>(
activity_data,
&data,
)

View file

@ -1,5 +1,4 @@
use crate::utils::generate_object_id;
use tracing::log::LevelFilter;
use activitypub_federation::config::FederationMiddleware;
use axum::{Router, Server};
@ -11,19 +10,13 @@ use federation::federation_config;
use std::net::ToSocketAddrs;
use tracing::info;
mod api;
pub mod api;
mod database;
pub mod error;
pub mod federation;
mod utils;
pub async fn start(hostname: &str) -> MyResult<()> {
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.filter_module("fediwiki", LevelFilter::Info)
.init();
let config = federation_config(hostname).await?;
info!("Listening with axum on {hostname}");

View file

@ -1,8 +1,14 @@
use fediwiki::error::MyResult;
use fediwiki::start;
use tracing::log::LevelFilter;
#[tokio::main]
pub async fn main() -> MyResult<()> {
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.filter_module("fediwiki", LevelFilter::Info)
.init();
start("localhost:8131").await?;
Ok(())
}

View file

@ -1,11 +1,34 @@
extern crate fediwiki;
use fediwiki::api::{FollowInstance, ResolveObject};
use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle;
use fediwiki::federation::objects::instance::DbInstance;
use fediwiki::start;
use once_cell::sync::Lazy;
use reqwest::Client;
use serial_test::serial;
use std::sync::Once;
use tracing::log::LevelFilter;
use url::Url;
fn setup() {
static INIT: Once = Once::new();
INIT.call_once(|| {
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.filter_module("fediwiki", LevelFilter::Info)
.init();
});
}
static CLIENT: Lazy<Client> = Lazy::new(|| Client::new());
#[tokio::test]
#[serial]
async fn test_get_article() -> MyResult<()> {
setup();
let hostname = "localhost:8131";
let handle = tokio::task::spawn(async {
start(hostname).await.unwrap();
@ -23,7 +46,9 @@ async fn test_get_article() -> MyResult<()> {
}
#[tokio::test]
#[serial]
async fn test_follow_instance() -> MyResult<()> {
setup();
let hostname_alpha = "localhost:8131";
let hostname_beta = "localhost:8132";
let handle_alpha = tokio::task::spawn(async {
@ -33,7 +58,60 @@ async fn test_follow_instance() -> MyResult<()> {
start(hostname_beta).await.unwrap();
});
// TODO
// check initial state
let alpha_instance: DbInstance = CLIENT
.get(format!("http://{hostname_alpha}/api/v1/instance"))
.send()
.await?
.json()
.await?;
assert_eq!(0, alpha_instance.follows.len());
let beta_instance: DbInstance = CLIENT
.get(format!("http://{hostname_beta}/api/v1/instance"))
.send()
.await?
.json()
.await?;
assert_eq!(0, beta_instance.followers.len());
// fetch beta instance on alpha
let resolve_object = ResolveObject {
id: Url::parse(&format!("http://{hostname_beta}"))?,
};
let beta_instance_resolved: DbInstance = CLIENT
.get(format!("http://{hostname_alpha}/api/v1/resolve_object"))
.query(&resolve_object)
.send()
.await?
.json()
.await?;
// send follow
let follow_instance = FollowInstance {
instance_id: beta_instance_resolved.ap_id,
};
CLIENT
.post(format!("http://{hostname_alpha}/api/v1/instance/follow"))
.form(&follow_instance)
.send()
.await?;
// check that follow was federated
let beta_instance: DbInstance = CLIENT
.get(format!("http://{hostname_beta}/api/v1/instance"))
.send()
.await?
.json()
.await?;
assert_eq!(1, beta_instance.followers.len());
let alpha_instance: DbInstance = CLIENT
.get(format!("http://{hostname_alpha}/api/v1/instance"))
.send()
.await?
.json()
.await?;
assert_eq!(1, alpha_instance.follows.len());
handle_alpha.abort();
handle_beta.abort();