From 703f10c746ee23aa0cd3e31e35d6b1377c0b4e68 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 16 Nov 2023 16:40:43 +0100 Subject: [PATCH] synchronize articles --- Cargo.lock | 1 + Cargo.toml | 1 + src/federation/mod.rs | 2 + src/federation/objects/article.rs | 2 +- src/federation/objects/articles_collection.rs | 93 +++++++++++++++++++ src/federation/objects/instance.rs | 8 ++ src/federation/objects/mod.rs | 1 + src/federation/routes.rs | 12 ++- tests/test.rs | 57 ++++++++++++ 9 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 src/federation/objects/articles_collection.rs diff --git a/Cargo.lock b/Cargo.lock index e0d95f7..00177fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -477,6 +477,7 @@ dependencies = [ "chrono", "enum_delegate", "env_logger", + "futures", "once_cell", "rand", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 6338503..34cad1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ axum-macros = "0.3.8" chrono = { version = "0.4.31", features = ["serde"] } enum_delegate = "0.2.0" env_logger = { version = "0.10.1", default-features = false } +futures = "0.3.29" rand = "0.8.5" serde = "1.0.192" serde_json = "1.0.108" diff --git a/src/federation/mod.rs b/src/federation/mod.rs index d114b24..6b3c829 100644 --- a/src/federation/mod.rs +++ b/src/federation/mod.rs @@ -13,10 +13,12 @@ pub mod routes; pub async fn federation_config(hostname: &str) -> Result, Error> { let ap_id = Url::parse(&format!("http://{}", hostname))?.into(); + let articles_id = Url::parse(&format!("http://{}/articles", hostname))?.into(); let inbox = Url::parse(&format!("http://{}/inbox", hostname))?; let keypair = generate_actor_keypair()?; let local_instance = DbInstance { ap_id, + articles_id, inbox, public_key: keypair.public_key, private_key: Some(keypair.private_key), diff --git a/src/federation/objects/article.rs b/src/federation/objects/article.rs index 716a743..0f877dd 100644 --- a/src/federation/objects/article.rs +++ b/src/federation/objects/article.rs @@ -20,7 +20,7 @@ pub struct DbArticle { pub local: bool, } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Article { #[serde(rename = "type")] diff --git a/src/federation/objects/articles_collection.rs b/src/federation/objects/articles_collection.rs new file mode 100644 index 0000000..ae9e023 --- /dev/null +++ b/src/federation/objects/articles_collection.rs @@ -0,0 +1,93 @@ +use crate::database::DatabaseHandle; +use crate::error::Error; +use crate::federation::objects::article::{Article, DbArticle}; +use crate::federation::objects::instance::DbInstance; +use crate::utils::generate_object_id; +use activitypub_federation::kinds::collection::CollectionType; +use activitypub_federation::{ + config::Data, + traits::{Collection, Object}, +}; +use futures::future; +use futures::future::try_join_all; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ArticleCollection { + pub(crate) r#type: CollectionType, + pub(crate) id: Url, + pub(crate) total_items: i32, + pub(crate) items: Vec
, +} + +#[derive(Clone, Debug)] +pub struct DbArticleCollection(Vec); + +#[async_trait::async_trait] +impl Collection for DbArticleCollection { + type Owner = DbInstance; + type DataType = DatabaseHandle; + type Kind = ArticleCollection; + type Error = Error; + + async fn read_local( + _owner: &Self::Owner, + data: &Data, + ) -> Result { + let local_articles = { + let articles = data.articles.lock().unwrap(); + articles + .iter() + .filter(|a| a.local) + .clone() + .cloned() + .collect::>() + }; + let articles = future::try_join_all( + local_articles + .into_iter() + .map(|a| a.into_json(data)) + .collect::>(), + ) + .await?; + let ap_id = generate_object_id(data.local_instance().ap_id.inner())?.into(); + let collection = ArticleCollection { + r#type: Default::default(), + id: ap_id, + total_items: articles.len() as i32, + items: articles, + }; + Ok(collection) + } + + async fn verify( + _apub: &Self::Kind, + _expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn from_json( + apub: Self::Kind, + _owner: &Self::Owner, + data: &Data, + ) -> Result + where + Self: Sized, + { + let mut articles = try_join_all( + apub.items + .into_iter() + .map(|i| DbArticle::from_json(i, data)), + ) + .await?; + let mut lock = data.articles.lock().unwrap(); + // TODO: need to overwrite existing items + lock.append(&mut articles); + // TODO: return value propably not needed + Ok(DbArticleCollection(articles)) + } +} diff --git a/src/federation/objects/instance.rs b/src/federation/objects/instance.rs index 98b2b6e..2745cbf 100644 --- a/src/federation/objects/instance.rs +++ b/src/federation/objects/instance.rs @@ -1,5 +1,7 @@ use crate::error::Error; +use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection}; use crate::{database::DatabaseHandle, federation::activities::follow::Follow}; +use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::kinds::actor::ServiceType; use activitypub_federation::{ activity_queue::send_activity, @@ -16,6 +18,7 @@ use url::Url; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DbInstance { pub ap_id: ObjectId, + pub articles_id: CollectionId, pub inbox: Url, pub(crate) public_key: String, pub(crate) private_key: Option, @@ -31,6 +34,7 @@ pub struct Instance { #[serde(rename = "type")] kind: ServiceType, id: ObjectId, + articles: CollectionId, inbox: Url, public_key: PublicKey, } @@ -97,6 +101,7 @@ impl Object for DbInstance { Ok(Instance { kind: Default::default(), id: self.ap_id.clone(), + articles: self.articles_id.clone(), inbox: self.inbox.clone(), public_key: self.public_key(), }) @@ -114,6 +119,7 @@ impl Object for DbInstance { async fn from_json(json: Self::Kind, data: &Data) -> Result { let instance = DbInstance { ap_id: json.id, + articles_id: json.articles, inbox: json.inbox, public_key: json.public_key.public_key_pem, private_key: None, @@ -122,6 +128,8 @@ impl Object for DbInstance { follows: vec![], local: false, }; + // TODO: very inefficient to sync all articles every time + instance.articles_id.dereference(&instance, &data).await?; let mut mutex = data.instances.lock().unwrap(); mutex.push(instance.clone()); Ok(instance) diff --git a/src/federation/objects/mod.rs b/src/federation/objects/mod.rs index 6aa0b78..f854ed7 100644 --- a/src/federation/objects/mod.rs +++ b/src/federation/objects/mod.rs @@ -1,2 +1,3 @@ pub mod article; +pub mod articles_collection; pub mod instance; diff --git a/src/federation/routes.rs b/src/federation/routes.rs index ca5a9e3..e004dc6 100644 --- a/src/federation/routes.rs +++ b/src/federation/routes.rs @@ -8,9 +8,10 @@ use activitypub_federation::axum::inbox::{receive_activity, ActivityData}; use activitypub_federation::axum::json::FederationJson; use activitypub_federation::config::Data; use activitypub_federation::protocol::context::WithContext; -use activitypub_federation::traits::ActivityHandler; use activitypub_federation::traits::Object; +use activitypub_federation::traits::{ActivityHandler, Collection}; +use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection}; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::Router; @@ -21,6 +22,7 @@ use url::Url; pub fn federation_routes() -> Router { Router::new() .route("/", get(http_get_instance)) + .route("/articles", get(http_get_articles)) .route("/inbox", post(http_post_inbox)) } @@ -33,6 +35,14 @@ async fn http_get_instance( Ok(FederationJson(WithContext::new_default(json_instance))) } +#[debug_handler] +async fn http_get_articles( + data: Data, +) -> MyResult>> { + let collection = DbArticleCollection::read_local(&data.local_instance(), &data).await?; + Ok(FederationJson(WithContext::new_default(collection))) +} + /// List of all activities which this actor can receive. #[derive(Deserialize, Serialize, Debug)] #[serde(untagged)] diff --git a/tests/test.rs b/tests/test.rs index 6772ecf..b88d4b1 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -97,3 +97,60 @@ async fn test_follow_instance() -> MyResult<()> { handle_beta.abort(); Ok(()) } + +#[tokio::test] +#[serial] +async fn test_synchronize_articles() -> MyResult<()> { + setup(); + let hostname_alpha = "localhost:8131"; + let hostname_beta = "localhost:8132"; + let handle_alpha = tokio::task::spawn(async { + start(hostname_alpha).await.unwrap(); + }); + let handle_beta = tokio::task::spawn(async { + start(hostname_beta).await.unwrap(); + }); + + // create article on alpha + let create_article = CreateArticle { + title: "Manu_Chao".to_string(), + text: "Lorem ipsum".to_string(), + }; + let create_res: DbArticle = post(hostname_alpha, "article", &create_article).await?; + assert_eq!(create_article.title, create_res.title); + assert!(create_res.local); + + // article is not yet on beta + let get_article = GetArticle { + title: "Manu_Chao".to_string(), + }; + let get_res = get_query::( + hostname_beta, + &format!("article"), + Some(get_article.clone()), + ) + .await; + assert!(get_res.is_err()); + + // fetch alpha instance on beta, articles are also fetched automatically + let resolve_object = ResolveObject { + id: Url::parse(&format!("http://{hostname_alpha}"))?, + }; + get_query::(hostname_beta, "resolve_object", Some(resolve_object)).await?; + + // get the article and compare + let get_res: DbArticle = get_query( + hostname_beta, + &format!("article"), + Some(get_article.clone()), + ) + .await?; + assert_eq!(create_res.ap_id, get_res.ap_id); + assert_eq!(create_article.title, get_res.title); + assert_eq!(create_article.text, get_res.text); + assert!(!get_res.local); + + handle_alpha.abort(); + handle_beta.abort(); + Ok(()) +}