1
0
Fork 0
mirror of https://github.com/Nutomic/ibis.git synced 2024-12-23 05:01:23 +00:00

Synchronize linked instances

This commit is contained in:
Felix Ableitner 2024-11-11 13:14:00 +01:00
parent 1678cbd279
commit 5556d0fef2
15 changed files with 345 additions and 23 deletions

13
Cargo.lock generated
View file

@ -1810,6 +1810,7 @@ dependencies = [
"pretty_assertions",
"rand",
"reqwest",
"retry_future",
"serde",
"serde_json",
"sha2",
@ -3330,6 +3331,18 @@ dependencies = [
"tower-service",
]
[[package]]
name = "retry_future"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a831851ed690d7d2ca106ddba947ee9f59e130c00311e815bd6d4ccbefbbc11f"
dependencies = [
"anyhow",
"futures",
"pin-project",
"tokio",
]
[[package]]
name = "ring"
version = "0.17.8"

View file

@ -113,6 +113,7 @@ codee = "0.2.0"
[dev-dependencies]
pretty_assertions = "1.4.1"
retry_future = "0.4.0"
[package.metadata.leptos]
output-name = "ibis"

View file

@ -0,0 +1,3 @@
alter table instance drop column instances_url;
alter table instance alter column articles_url set not null;

View file

@ -0,0 +1,3 @@
alter table instance add column instances_url varchar(255) unique;
alter table instance alter column articles_url drop not null;

View file

@ -5,7 +5,10 @@ use crate::{
IbisData,
},
error::MyResult,
federation::objects::articles_collection::DbArticleCollection,
federation::objects::{
articles_collection::DbArticleCollection,
instance_collection::DbInstanceCollection,
},
},
common::{DbInstance, DbPerson, InstanceView},
};
@ -31,12 +34,13 @@ pub struct DbInstanceForm {
pub domain: String,
pub ap_id: ObjectId<DbInstance>,
pub description: Option<String>,
pub articles_url: CollectionId<DbArticleCollection>,
pub articles_url: Option<CollectionId<DbArticleCollection>>,
pub inbox_url: String,
pub public_key: String,
pub private_key: Option<String>,
pub last_refreshed_at: DateTime<Utc>,
pub local: bool,
pub instances_url: Option<CollectionId<DbInstanceCollection>>,
}
impl DbInstance {

View file

@ -48,13 +48,15 @@ diesel::table! {
ap_id -> Varchar,
description -> Nullable<Text>,
#[max_length = 255]
articles_url -> Varchar,
articles_url -> Nullable<Varchar>,
#[max_length = 255]
inbox_url -> Varchar,
public_key -> Text,
private_key -> Nullable<Text>,
last_refreshed_at -> Timestamptz,
local -> Bool,
#[max_length = 255]
instances_url -> Nullable<Varchar>,
}
}

View file

@ -1,14 +1,19 @@
use crate::{
backend::{database::IbisData, error::Error, federation::objects::article::ApubArticle},
common::{DbArticle, DbInstance},
backend::{
database::IbisData,
error::{Error, MyResult},
federation::objects::article::ApubArticle,
},
common::{utils::http_protocol_str, DbArticle},
};
use activitypub_federation::{
config::Data,
fetch::collection_id::CollectionId,
kinds::collection::CollectionType,
protocol::verification::verify_domains_match,
traits::{Collection, Object},
};
use futures::future::{self, join_all};
use futures::future::{join_all, try_join_all};
use log::warn;
use serde::{Deserialize, Serialize};
use url::Url;
@ -25,19 +30,26 @@ pub struct ArticleCollection {
#[derive(Clone, Debug)]
pub struct DbArticleCollection(());
pub fn local_articles_url(domain: &str) -> MyResult<CollectionId<DbArticleCollection>> {
Ok(CollectionId::parse(&format!(
"{}://{domain}/all_articles",
http_protocol_str()
))?)
}
#[async_trait::async_trait]
impl Collection for DbArticleCollection {
type Owner = DbInstance;
type Owner = ();
type DataType = IbisData;
type Kind = ArticleCollection;
type Error = Error;
async fn read_local(
owner: &Self::Owner,
_owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> {
let local_articles = DbArticle::read_all(Some(true), None, data)?;
let articles = future::try_join_all(
let articles = try_join_all(
local_articles
.into_iter()
.map(|a| a.into_json(data))
@ -46,7 +58,7 @@ impl Collection for DbArticleCollection {
.await?;
let collection = ArticleCollection {
r#type: Default::default(),
id: owner.articles_url.clone().into(),
id: local_articles_url(&data.config.federation.domain)?.into(),
total_items: articles.len() as i32,
items: articles,
};

View file

@ -1,3 +1,4 @@
use super::instance_collection::DbInstanceCollection;
use crate::{
backend::{
database::{instance::DbInstanceForm, IbisData},
@ -23,9 +24,10 @@ use url::Url;
pub struct ApubInstance {
#[serde(rename = "type")]
kind: ServiceType,
id: ObjectId<DbInstance>,
pub id: ObjectId<DbInstance>,
content: Option<String>,
articles: CollectionId<DbArticleCollection>,
articles: Option<CollectionId<DbArticleCollection>>,
instances: Option<CollectionId<DbInstanceCollection>>,
inbox: Url,
public_key: PublicKey,
}
@ -86,6 +88,7 @@ impl Object for DbInstance {
id: self.ap_id.clone(),
content: self.description.clone(),
articles: self.articles_url.clone(),
instances: self.instances_url.clone(),
inbox: Url::parse(&self.inbox_url)?,
public_key: self.public_key(),
})
@ -107,6 +110,7 @@ impl Object for DbInstance {
ap_id: json.id,
description: json.content,
articles_url: json.articles,
instances_url: json.instances,
inbox_url: json.inbox.to_string(),
public_key: json.public_key.public_key_pem,
private_key: None,
@ -119,9 +123,17 @@ impl Object for DbInstance {
let instance_ = instance.clone();
let data_ = data.reset_request_count();
tokio::spawn(async move {
let res = instance_.articles_url.dereference(&instance_, &data_).await;
if let Err(e) = res {
tracing::warn!("error in spawn: {e}");
if let Some(articles_url) = &instance_.articles_url {
let res = articles_url.dereference(&(), &data_).await;
if let Err(e) = res {
tracing::warn!("error in spawn: {e}");
}
}
if let Some(instances_url) = &instance_.instances_url {
let res = instances_url.dereference(&(), &data_).await;
if let Err(e) = res {
tracing::warn!("error in spawn: {e}");
}
}
});

View file

@ -0,0 +1,94 @@
use super::instance::ApubInstance;
use crate::{
backend::{
database::IbisData,
error::{Error, MyResult},
},
common::{utils::http_protocol_str, DbInstance},
};
use activitypub_federation::{
config::Data,
fetch::collection_id::CollectionId,
kinds::collection::CollectionType,
protocol::verification::verify_domains_match,
traits::{Collection, Object},
};
use futures::future::{self, join_all};
use log::warn;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct InstanceCollection {
pub r#type: CollectionType,
pub id: Url,
pub total_items: i32,
pub items: Vec<ApubInstance>,
}
#[derive(Clone, Debug)]
pub struct DbInstanceCollection(());
pub fn linked_instances_url(domain: &str) -> MyResult<CollectionId<DbInstanceCollection>> {
Ok(CollectionId::parse(&format!(
"{}://{domain}/linked_instances",
http_protocol_str()
))?)
}
#[async_trait::async_trait]
impl Collection for DbInstanceCollection {
type Owner = ();
type DataType = IbisData;
type Kind = InstanceCollection;
type Error = Error;
async fn read_local(
_owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> {
let instances = DbInstance::read_remote(data)?;
let instances = future::try_join_all(
instances
.into_iter()
.map(|i| i.into_json(data))
.collect::<Vec<_>>(),
)
.await?;
let collection = InstanceCollection {
r#type: Default::default(),
id: linked_instances_url(&data.config.federation.domain)?.into(),
total_items: instances.len() as i32,
items: instances,
};
Ok(collection)
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(&json.id, expected_domain)?;
Ok(())
}
async fn from_json(
apub: Self::Kind,
_owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
join_all(apub.items.into_iter().map(|instance| async {
let id = instance.id.clone();
let res = DbInstance::from_json(instance, data).await;
if let Err(e) = &res {
warn!("Failed to synchronize article {id}: {e}");
}
res
}))
.await;
Ok(DbInstanceCollection(()))
}
}

View file

@ -3,4 +3,5 @@ pub mod articles_collection;
pub mod edit;
pub mod edits_collection;
pub mod instance;
pub mod instance_collection;
pub mod user;

View file

@ -1,3 +1,4 @@
use super::objects::instance_collection::{DbInstanceCollection, InstanceCollection};
use crate::{
backend::{
database::IbisData,
@ -47,6 +48,7 @@ pub fn federation_routes() -> Router<()> {
.route("/", get(http_get_instance))
.route("/user/:name", get(http_get_person))
.route("/all_articles", get(http_get_all_articles))
.route("/linked_instances", get(http_get_linked_instances))
.route("/article/:title", get(http_get_article))
.route("/article/:title/edits", get(http_get_article_edits))
.route("/inbox", post(http_post_inbox))
@ -75,8 +77,15 @@ async fn http_get_person(
async fn http_get_all_articles(
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<ArticleCollection>>> {
let local_instance = DbInstance::read_local_instance(&data)?;
let collection = DbArticleCollection::read_local(&local_instance, &data).await?;
let collection = DbArticleCollection::read_local(&(), &data).await?;
Ok(FederationJson(WithContext::new_default(collection)))
}
#[debug_handler]
async fn http_get_linked_instances(
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<InstanceCollection>>> {
let collection = DbInstanceCollection::read_local(&(), &data).await?;
Ok(FederationJson(WithContext::new_default(collection)))
}

View file

@ -18,7 +18,7 @@ use crate::{
};
use activitypub_federation::{
config::{Data, FederationConfig, FederationMiddleware},
fetch::{collection_id::CollectionId, object_id::ObjectId},
fetch::object_id::ObjectId,
http_signatures::generate_actor_keypair,
};
use api::api_routes;
@ -38,6 +38,10 @@ use diesel::{
PgConnection,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use federation::objects::{
articles_collection::local_articles_url,
instance_collection::linked_instances_url,
};
use leptos::get_configuration;
use leptos_axum::{generate_route_list, LeptosRoutes};
use log::info;
@ -74,7 +78,8 @@ pub async fn start(config: IbisConfig, override_hostname: Option<SocketAddr>) ->
.domain(data.config.federation.domain.clone())
.url_verifier(Box::new(VerifyUrlData(data.config.clone())))
.app_data(data)
.debug(true)
.http_fetch_limit(1000)
.debug(cfg!(debug_assertions))
.build()
.await?;
@ -121,15 +126,14 @@ and to list interesting articles.";
async fn setup(data: &Data<IbisData>) -> Result<(), Error> {
let domain = &data.config.federation.domain;
let ap_id = ObjectId::parse(&format!("{}://{domain}", http_protocol_str()))?;
let articles_url =
CollectionId::parse(&format!("{}://{domain}/all_articles", http_protocol_str()))?;
let inbox_url = format!("{}://{domain}/inbox", http_protocol_str());
let keypair = generate_actor_keypair()?;
let form = DbInstanceForm {
domain: domain.to_string(),
ap_id,
description: Some("New Ibis instance".to_string()),
articles_url,
articles_url: Some(local_articles_url(domain)?),
instances_url: Some(linked_instances_url(domain)?),
inbox_url,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),

View file

@ -11,6 +11,7 @@ use {
crate::backend::{
database::schema::{article, edit, instance, local_user, person},
federation::objects::articles_collection::DbArticleCollection,
federation::objects::instance_collection::DbInstanceCollection,
},
activitypub_federation::fetch::{collection_id::CollectionId, object_id::ObjectId},
diesel::{Identifiable, Queryable, Selectable},
@ -253,7 +254,7 @@ pub struct DbInstance {
pub ap_id: String,
pub description: Option<String>,
#[cfg(feature = "ssr")]
pub articles_url: CollectionId<DbArticleCollection>,
pub articles_url: Option<CollectionId<DbArticleCollection>>,
#[cfg(not(feature = "ssr"))]
pub articles_url: String,
pub inbox_url: String,
@ -264,6 +265,8 @@ pub struct DbInstance {
#[serde(skip)]
pub last_refreshed_at: DateTime<Utc>,
pub local: bool,
#[cfg(feature = "ssr")]
pub instances_url: Option<CollectionId<DbInstanceCollection>>,
}
impl DbInstance {

121
src/database/schema.rs Normal file
View file

@ -0,0 +1,121 @@
// @generated automatically by Diesel CLI.
diesel::table! {
article (id) {
id -> Int4,
title -> Text,
text -> Text,
#[max_length = 255]
ap_id -> Varchar,
instance_id -> Int4,
local -> Bool,
protected -> Bool,
}
}
diesel::table! {
conflict (id) {
id -> Int4,
hash -> Uuid,
diff -> Text,
summary -> Text,
creator_id -> Int4,
article_id -> Int4,
previous_version_id -> Uuid,
}
}
diesel::table! {
edit (id) {
id -> Int4,
creator_id -> Int4,
hash -> Uuid,
#[max_length = 255]
ap_id -> Varchar,
diff -> Text,
summary -> Text,
article_id -> Int4,
previous_version_id -> Uuid,
created -> Timestamptz,
}
}
diesel::table! {
instance (id) {
id -> Int4,
domain -> Text,
#[max_length = 255]
ap_id -> Varchar,
description -> Nullable<Text>,
#[max_length = 255]
articles_url -> Nullable<Varchar>,
#[max_length = 255]
inbox_url -> Varchar,
public_key -> Text,
private_key -> Nullable<Text>,
last_refreshed_at -> Timestamptz,
local -> Bool,
#[max_length = 255]
instances_url -> Nullable<Varchar>,
}
}
diesel::table! {
instance_follow (id) {
id -> Int4,
instance_id -> Int4,
follower_id -> Int4,
pending -> Bool,
}
}
diesel::table! {
jwt_secret (id) {
id -> Int4,
secret -> Varchar,
}
}
diesel::table! {
local_user (id) {
id -> Int4,
password_encrypted -> Text,
person_id -> Int4,
admin -> Bool,
}
}
diesel::table! {
person (id) {
id -> Int4,
username -> Text,
#[max_length = 255]
ap_id -> Varchar,
#[max_length = 255]
inbox_url -> Varchar,
public_key -> Text,
private_key -> Nullable<Text>,
last_refreshed_at -> Timestamptz,
local -> Bool,
}
}
diesel::joinable!(article -> instance (instance_id));
diesel::joinable!(conflict -> article (article_id));
diesel::joinable!(conflict -> local_user (creator_id));
diesel::joinable!(edit -> article (article_id));
diesel::joinable!(edit -> person (creator_id));
diesel::joinable!(instance_follow -> instance (instance_id));
diesel::joinable!(instance_follow -> person (follower_id));
diesel::joinable!(local_user -> person (person_id));
diesel::allow_tables_to_appear_in_same_query!(
article,
conflict,
edit,
instance,
instance_follow,
jwt_secret,
local_user,
person,
);

View file

@ -21,6 +21,7 @@ use ibis::{
frontend::error::MyResult,
};
use pretty_assertions::{assert_eq, assert_ne};
use retry_future::{LinearRetryStrategy, RetryFuture, RetryPolicy};
use url::Url;
#[tokio::test]
@ -673,3 +674,42 @@ async fn test_lock_article() -> MyResult<()> {
data.stop()
}
#[tokio::test]
async fn test_synchronize_instances() -> MyResult<()> {
let data = TestData::start().await;
// fetch alpha instance on beta
data.beta
.resolve_instance(Url::parse(&format!("http://{}", &data.alpha.hostname))?)
.await?;
let beta_instances = data.beta.list_instances().await?;
assert_eq!(1, beta_instances.len());
// fetch beta instance on gamma
data.gamma
.resolve_instance(Url::parse(&format!("http://{}", &data.beta.hostname))?)
.await?;
// wait until instance collection is fetched
let gamma_instances = RetryFuture::new(
|| async {
let res = data.gamma.list_instances().await;
match res {
Err(_) => Err(RetryPolicy::<String>::Retry(None)),
Ok(i) if i.len() < 2 => Err(RetryPolicy::Retry(None)),
Ok(i) => Ok(i),
}
},
LinearRetryStrategy::new(),
)
.await?;
// now gamma also knows about alpha
assert_eq!(2, gamma_instances.len());
assert!(gamma_instances
.iter()
.any(|i| i.domain == data.alpha.hostname));
data.stop()
}