From 5556d0fef2312f336c21d49004a2cb3d8d773a7a Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 11 Nov 2024 13:14:00 +0100 Subject: [PATCH] Synchronize linked instances --- Cargo.lock | 13 ++ Cargo.toml | 1 + .../2024-11-11-111150_instances_url/down.sql | 3 + .../2024-11-11-111150_instances_url/up.sql | 3 + src/backend/database/instance.rs | 8 +- src/backend/database/schema.rs | 4 +- .../federation/objects/articles_collection.rs | 26 +++- src/backend/federation/objects/instance.rs | 22 +++- .../federation/objects/instance_collection.rs | 94 ++++++++++++++ src/backend/federation/objects/mod.rs | 1 + src/backend/federation/routes.rs | 13 +- src/backend/mod.rs | 14 +- src/common/mod.rs | 5 +- src/database/schema.rs | 121 ++++++++++++++++++ tests/test.rs | 40 ++++++ 15 files changed, 345 insertions(+), 23 deletions(-) create mode 100644 migrations/2024-11-11-111150_instances_url/down.sql create mode 100644 migrations/2024-11-11-111150_instances_url/up.sql create mode 100644 src/backend/federation/objects/instance_collection.rs create mode 100644 src/database/schema.rs diff --git a/Cargo.lock b/Cargo.lock index 3c7c254..079ea82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1810,6 +1810,7 @@ dependencies = [ "pretty_assertions", "rand", "reqwest", + "retry_future", "serde", "serde_json", "sha2", @@ -3330,6 +3331,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "retry_future" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a831851ed690d7d2ca106ddba947ee9f59e130c00311e815bd6d4ccbefbbc11f" +dependencies = [ + "anyhow", + "futures", + "pin-project", + "tokio", +] + [[package]] name = "ring" version = "0.17.8" diff --git a/Cargo.toml b/Cargo.toml index d94c9d9..546be84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ codee = "0.2.0" [dev-dependencies] pretty_assertions = "1.4.1" +retry_future = "0.4.0" [package.metadata.leptos] output-name = "ibis" diff --git a/migrations/2024-11-11-111150_instances_url/down.sql b/migrations/2024-11-11-111150_instances_url/down.sql new file mode 100644 index 0000000..fdd4cc0 --- /dev/null +++ b/migrations/2024-11-11-111150_instances_url/down.sql @@ -0,0 +1,3 @@ +alter table instance drop column instances_url; + +alter table instance alter column articles_url set not null; diff --git a/migrations/2024-11-11-111150_instances_url/up.sql b/migrations/2024-11-11-111150_instances_url/up.sql new file mode 100644 index 0000000..7addfa6 --- /dev/null +++ b/migrations/2024-11-11-111150_instances_url/up.sql @@ -0,0 +1,3 @@ +alter table instance add column instances_url varchar(255) unique; + +alter table instance alter column articles_url drop not null; diff --git a/src/backend/database/instance.rs b/src/backend/database/instance.rs index c740e08..1242888 100644 --- a/src/backend/database/instance.rs +++ b/src/backend/database/instance.rs @@ -5,7 +5,10 @@ use crate::{ IbisData, }, error::MyResult, - federation::objects::articles_collection::DbArticleCollection, + federation::objects::{ + articles_collection::DbArticleCollection, + instance_collection::DbInstanceCollection, + }, }, common::{DbInstance, DbPerson, InstanceView}, }; @@ -31,12 +34,13 @@ pub struct DbInstanceForm { pub domain: String, pub ap_id: ObjectId, pub description: Option, - pub articles_url: CollectionId, + pub articles_url: Option>, pub inbox_url: String, pub public_key: String, pub private_key: Option, pub last_refreshed_at: DateTime, pub local: bool, + pub instances_url: Option>, } impl DbInstance { diff --git a/src/backend/database/schema.rs b/src/backend/database/schema.rs index 7a8e12d..81b43ed 100644 --- a/src/backend/database/schema.rs +++ b/src/backend/database/schema.rs @@ -48,13 +48,15 @@ diesel::table! { ap_id -> Varchar, description -> Nullable, #[max_length = 255] - articles_url -> Varchar, + articles_url -> Nullable, #[max_length = 255] inbox_url -> Varchar, public_key -> Text, private_key -> Nullable, last_refreshed_at -> Timestamptz, local -> Bool, + #[max_length = 255] + instances_url -> Nullable, } } diff --git a/src/backend/federation/objects/articles_collection.rs b/src/backend/federation/objects/articles_collection.rs index bc65ec9..94bc273 100644 --- a/src/backend/federation/objects/articles_collection.rs +++ b/src/backend/federation/objects/articles_collection.rs @@ -1,14 +1,19 @@ use crate::{ - backend::{database::IbisData, error::Error, federation::objects::article::ApubArticle}, - common::{DbArticle, DbInstance}, + backend::{ + database::IbisData, + error::{Error, MyResult}, + federation::objects::article::ApubArticle, + }, + common::{utils::http_protocol_str, DbArticle}, }; use activitypub_federation::{ config::Data, + fetch::collection_id::CollectionId, kinds::collection::CollectionType, protocol::verification::verify_domains_match, traits::{Collection, Object}, }; -use futures::future::{self, join_all}; +use futures::future::{join_all, try_join_all}; use log::warn; use serde::{Deserialize, Serialize}; use url::Url; @@ -25,19 +30,26 @@ pub struct ArticleCollection { #[derive(Clone, Debug)] pub struct DbArticleCollection(()); +pub fn local_articles_url(domain: &str) -> MyResult> { + Ok(CollectionId::parse(&format!( + "{}://{domain}/all_articles", + http_protocol_str() + ))?) +} + #[async_trait::async_trait] impl Collection for DbArticleCollection { - type Owner = DbInstance; + type Owner = (); type DataType = IbisData; type Kind = ArticleCollection; type Error = Error; async fn read_local( - owner: &Self::Owner, + _owner: &Self::Owner, data: &Data, ) -> Result { let local_articles = DbArticle::read_all(Some(true), None, data)?; - let articles = future::try_join_all( + let articles = try_join_all( local_articles .into_iter() .map(|a| a.into_json(data)) @@ -46,7 +58,7 @@ impl Collection for DbArticleCollection { .await?; let collection = ArticleCollection { r#type: Default::default(), - id: owner.articles_url.clone().into(), + id: local_articles_url(&data.config.federation.domain)?.into(), total_items: articles.len() as i32, items: articles, }; diff --git a/src/backend/federation/objects/instance.rs b/src/backend/federation/objects/instance.rs index b574d2b..52744c8 100644 --- a/src/backend/federation/objects/instance.rs +++ b/src/backend/federation/objects/instance.rs @@ -1,3 +1,4 @@ +use super::instance_collection::DbInstanceCollection; use crate::{ backend::{ database::{instance::DbInstanceForm, IbisData}, @@ -23,9 +24,10 @@ use url::Url; pub struct ApubInstance { #[serde(rename = "type")] kind: ServiceType, - id: ObjectId, + pub id: ObjectId, content: Option, - articles: CollectionId, + articles: Option>, + instances: Option>, inbox: Url, public_key: PublicKey, } @@ -86,6 +88,7 @@ impl Object for DbInstance { id: self.ap_id.clone(), content: self.description.clone(), articles: self.articles_url.clone(), + instances: self.instances_url.clone(), inbox: Url::parse(&self.inbox_url)?, public_key: self.public_key(), }) @@ -107,6 +110,7 @@ impl Object for DbInstance { ap_id: json.id, description: json.content, articles_url: json.articles, + instances_url: json.instances, inbox_url: json.inbox.to_string(), public_key: json.public_key.public_key_pem, private_key: None, @@ -119,9 +123,17 @@ impl Object for DbInstance { let instance_ = instance.clone(); let data_ = data.reset_request_count(); tokio::spawn(async move { - let res = instance_.articles_url.dereference(&instance_, &data_).await; - if let Err(e) = res { - tracing::warn!("error in spawn: {e}"); + if let Some(articles_url) = &instance_.articles_url { + let res = articles_url.dereference(&(), &data_).await; + if let Err(e) = res { + tracing::warn!("error in spawn: {e}"); + } + } + if let Some(instances_url) = &instance_.instances_url { + let res = instances_url.dereference(&(), &data_).await; + if let Err(e) = res { + tracing::warn!("error in spawn: {e}"); + } } }); diff --git a/src/backend/federation/objects/instance_collection.rs b/src/backend/federation/objects/instance_collection.rs new file mode 100644 index 0000000..9670098 --- /dev/null +++ b/src/backend/federation/objects/instance_collection.rs @@ -0,0 +1,94 @@ +use super::instance::ApubInstance; +use crate::{ + backend::{ + database::IbisData, + error::{Error, MyResult}, + }, + common::{utils::http_protocol_str, DbInstance}, +}; +use activitypub_federation::{ + config::Data, + fetch::collection_id::CollectionId, + kinds::collection::CollectionType, + protocol::verification::verify_domains_match, + traits::{Collection, Object}, +}; +use futures::future::{self, join_all}; +use log::warn; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct InstanceCollection { + pub r#type: CollectionType, + pub id: Url, + pub total_items: i32, + pub items: Vec, +} + +#[derive(Clone, Debug)] +pub struct DbInstanceCollection(()); + +pub fn linked_instances_url(domain: &str) -> MyResult> { + Ok(CollectionId::parse(&format!( + "{}://{domain}/linked_instances", + http_protocol_str() + ))?) +} + +#[async_trait::async_trait] +impl Collection for DbInstanceCollection { + type Owner = (); + type DataType = IbisData; + type Kind = InstanceCollection; + type Error = Error; + + async fn read_local( + _owner: &Self::Owner, + data: &Data, + ) -> Result { + let instances = DbInstance::read_remote(data)?; + let instances = future::try_join_all( + instances + .into_iter() + .map(|i| i.into_json(data)) + .collect::>(), + ) + .await?; + let collection = InstanceCollection { + r#type: Default::default(), + id: linked_instances_url(&data.config.federation.domain)?.into(), + total_items: instances.len() as i32, + items: instances, + }; + Ok(collection) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data, + ) -> Result<(), Self::Error> { + verify_domains_match(&json.id, expected_domain)?; + Ok(()) + } + + async fn from_json( + apub: Self::Kind, + _owner: &Self::Owner, + data: &Data, + ) -> Result { + join_all(apub.items.into_iter().map(|instance| async { + let id = instance.id.clone(); + let res = DbInstance::from_json(instance, data).await; + if let Err(e) = &res { + warn!("Failed to synchronize article {id}: {e}"); + } + res + })) + .await; + + Ok(DbInstanceCollection(())) + } +} diff --git a/src/backend/federation/objects/mod.rs b/src/backend/federation/objects/mod.rs index 0b14474..4f83dbf 100644 --- a/src/backend/federation/objects/mod.rs +++ b/src/backend/federation/objects/mod.rs @@ -3,4 +3,5 @@ pub mod articles_collection; pub mod edit; pub mod edits_collection; pub mod instance; +pub mod instance_collection; pub mod user; diff --git a/src/backend/federation/routes.rs b/src/backend/federation/routes.rs index 5ca4f60..049534c 100644 --- a/src/backend/federation/routes.rs +++ b/src/backend/federation/routes.rs @@ -1,3 +1,4 @@ +use super::objects::instance_collection::{DbInstanceCollection, InstanceCollection}; use crate::{ backend::{ database::IbisData, @@ -47,6 +48,7 @@ pub fn federation_routes() -> Router<()> { .route("/", get(http_get_instance)) .route("/user/:name", get(http_get_person)) .route("/all_articles", get(http_get_all_articles)) + .route("/linked_instances", get(http_get_linked_instances)) .route("/article/:title", get(http_get_article)) .route("/article/:title/edits", get(http_get_article_edits)) .route("/inbox", post(http_post_inbox)) @@ -75,8 +77,15 @@ async fn http_get_person( async fn http_get_all_articles( data: Data, ) -> MyResult>> { - let local_instance = DbInstance::read_local_instance(&data)?; - let collection = DbArticleCollection::read_local(&local_instance, &data).await?; + let collection = DbArticleCollection::read_local(&(), &data).await?; + Ok(FederationJson(WithContext::new_default(collection))) +} + +#[debug_handler] +async fn http_get_linked_instances( + data: Data, +) -> MyResult>> { + let collection = DbInstanceCollection::read_local(&(), &data).await?; Ok(FederationJson(WithContext::new_default(collection))) } diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 3670053..377af61 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -18,7 +18,7 @@ use crate::{ }; use activitypub_federation::{ config::{Data, FederationConfig, FederationMiddleware}, - fetch::{collection_id::CollectionId, object_id::ObjectId}, + fetch::object_id::ObjectId, http_signatures::generate_actor_keypair, }; use api::api_routes; @@ -38,6 +38,10 @@ use diesel::{ PgConnection, }; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use federation::objects::{ + articles_collection::local_articles_url, + instance_collection::linked_instances_url, +}; use leptos::get_configuration; use leptos_axum::{generate_route_list, LeptosRoutes}; use log::info; @@ -74,7 +78,8 @@ pub async fn start(config: IbisConfig, override_hostname: Option) -> .domain(data.config.federation.domain.clone()) .url_verifier(Box::new(VerifyUrlData(data.config.clone()))) .app_data(data) - .debug(true) + .http_fetch_limit(1000) + .debug(cfg!(debug_assertions)) .build() .await?; @@ -121,15 +126,14 @@ and to list interesting articles."; async fn setup(data: &Data) -> Result<(), Error> { let domain = &data.config.federation.domain; let ap_id = ObjectId::parse(&format!("{}://{domain}", http_protocol_str()))?; - let articles_url = - CollectionId::parse(&format!("{}://{domain}/all_articles", http_protocol_str()))?; let inbox_url = format!("{}://{domain}/inbox", http_protocol_str()); let keypair = generate_actor_keypair()?; let form = DbInstanceForm { domain: domain.to_string(), ap_id, description: Some("New Ibis instance".to_string()), - articles_url, + articles_url: Some(local_articles_url(domain)?), + instances_url: Some(linked_instances_url(domain)?), inbox_url, public_key: keypair.public_key, private_key: Some(keypair.private_key), diff --git a/src/common/mod.rs b/src/common/mod.rs index da8b58e..510baaa 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -11,6 +11,7 @@ use { crate::backend::{ database::schema::{article, edit, instance, local_user, person}, federation::objects::articles_collection::DbArticleCollection, + federation::objects::instance_collection::DbInstanceCollection, }, activitypub_federation::fetch::{collection_id::CollectionId, object_id::ObjectId}, diesel::{Identifiable, Queryable, Selectable}, @@ -253,7 +254,7 @@ pub struct DbInstance { pub ap_id: String, pub description: Option, #[cfg(feature = "ssr")] - pub articles_url: CollectionId, + pub articles_url: Option>, #[cfg(not(feature = "ssr"))] pub articles_url: String, pub inbox_url: String, @@ -264,6 +265,8 @@ pub struct DbInstance { #[serde(skip)] pub last_refreshed_at: DateTime, pub local: bool, + #[cfg(feature = "ssr")] + pub instances_url: Option>, } impl DbInstance { diff --git a/src/database/schema.rs b/src/database/schema.rs new file mode 100644 index 0000000..81b43ed --- /dev/null +++ b/src/database/schema.rs @@ -0,0 +1,121 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + article (id) { + id -> Int4, + title -> Text, + text -> Text, + #[max_length = 255] + ap_id -> Varchar, + instance_id -> Int4, + local -> Bool, + protected -> Bool, + } +} + +diesel::table! { + conflict (id) { + id -> Int4, + hash -> Uuid, + diff -> Text, + summary -> Text, + creator_id -> Int4, + article_id -> Int4, + previous_version_id -> Uuid, + } +} + +diesel::table! { + edit (id) { + id -> Int4, + creator_id -> Int4, + hash -> Uuid, + #[max_length = 255] + ap_id -> Varchar, + diff -> Text, + summary -> Text, + article_id -> Int4, + previous_version_id -> Uuid, + created -> Timestamptz, + } +} + +diesel::table! { + instance (id) { + id -> Int4, + domain -> Text, + #[max_length = 255] + ap_id -> Varchar, + description -> Nullable, + #[max_length = 255] + articles_url -> Nullable, + #[max_length = 255] + inbox_url -> Varchar, + public_key -> Text, + private_key -> Nullable, + last_refreshed_at -> Timestamptz, + local -> Bool, + #[max_length = 255] + instances_url -> Nullable, + } +} + +diesel::table! { + instance_follow (id) { + id -> Int4, + instance_id -> Int4, + follower_id -> Int4, + pending -> Bool, + } +} + +diesel::table! { + jwt_secret (id) { + id -> Int4, + secret -> Varchar, + } +} + +diesel::table! { + local_user (id) { + id -> Int4, + password_encrypted -> Text, + person_id -> Int4, + admin -> Bool, + } +} + +diesel::table! { + person (id) { + id -> Int4, + username -> Text, + #[max_length = 255] + ap_id -> Varchar, + #[max_length = 255] + inbox_url -> Varchar, + public_key -> Text, + private_key -> Nullable, + last_refreshed_at -> Timestamptz, + local -> Bool, + } +} + +diesel::joinable!(article -> instance (instance_id)); +diesel::joinable!(conflict -> article (article_id)); +diesel::joinable!(conflict -> local_user (creator_id)); +diesel::joinable!(edit -> article (article_id)); +diesel::joinable!(edit -> person (creator_id)); +diesel::joinable!(instance_follow -> instance (instance_id)); +diesel::joinable!(instance_follow -> person (follower_id)); +diesel::joinable!(local_user -> person (person_id)); + +diesel::allow_tables_to_appear_in_same_query!( + article, + conflict, + edit, + instance, + instance_follow, + jwt_secret, + local_user, + person, +); diff --git a/tests/test.rs b/tests/test.rs index 0a0e08d..fa6606e 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -21,6 +21,7 @@ use ibis::{ frontend::error::MyResult, }; use pretty_assertions::{assert_eq, assert_ne}; +use retry_future::{LinearRetryStrategy, RetryFuture, RetryPolicy}; use url::Url; #[tokio::test] @@ -673,3 +674,42 @@ async fn test_lock_article() -> MyResult<()> { data.stop() } + +#[tokio::test] +async fn test_synchronize_instances() -> MyResult<()> { + let data = TestData::start().await; + + // fetch alpha instance on beta + data.beta + .resolve_instance(Url::parse(&format!("http://{}", &data.alpha.hostname))?) + .await?; + let beta_instances = data.beta.list_instances().await?; + assert_eq!(1, beta_instances.len()); + + // fetch beta instance on gamma + data.gamma + .resolve_instance(Url::parse(&format!("http://{}", &data.beta.hostname))?) + .await?; + + // wait until instance collection is fetched + let gamma_instances = RetryFuture::new( + || async { + let res = data.gamma.list_instances().await; + match res { + Err(_) => Err(RetryPolicy::::Retry(None)), + Ok(i) if i.len() < 2 => Err(RetryPolicy::Retry(None)), + Ok(i) => Ok(i), + } + }, + LinearRetryStrategy::new(), + ) + .await?; + + // now gamma also knows about alpha + assert_eq!(2, gamma_instances.len()); + assert!(gamma_instances + .iter() + .any(|i| i.domain == data.alpha.hostname)); + + data.stop() +}