synchronize articles

This commit is contained in:
Felix Ableitner 2023-11-16 16:40:43 +01:00
parent 5b8d393918
commit 703f10c746
9 changed files with 175 additions and 2 deletions

1
Cargo.lock generated
View File

@ -477,6 +477,7 @@ dependencies = [
"chrono", "chrono",
"enum_delegate", "enum_delegate",
"env_logger", "env_logger",
"futures",
"once_cell", "once_cell",
"rand", "rand",
"reqwest", "reqwest",

View File

@ -12,6 +12,7 @@ axum-macros = "0.3.8"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
enum_delegate = "0.2.0" enum_delegate = "0.2.0"
env_logger = { version = "0.10.1", default-features = false } env_logger = { version = "0.10.1", default-features = false }
futures = "0.3.29"
rand = "0.8.5" rand = "0.8.5"
serde = "1.0.192" serde = "1.0.192"
serde_json = "1.0.108" serde_json = "1.0.108"

View File

@ -13,10 +13,12 @@ pub mod routes;
pub async fn federation_config(hostname: &str) -> Result<FederationConfig<DatabaseHandle>, Error> { pub async fn federation_config(hostname: &str) -> Result<FederationConfig<DatabaseHandle>, Error> {
let ap_id = Url::parse(&format!("http://{}", hostname))?.into(); let ap_id = Url::parse(&format!("http://{}", hostname))?.into();
let articles_id = Url::parse(&format!("http://{}/articles", hostname))?.into();
let inbox = Url::parse(&format!("http://{}/inbox", hostname))?; let inbox = Url::parse(&format!("http://{}/inbox", hostname))?;
let keypair = generate_actor_keypair()?; let keypair = generate_actor_keypair()?;
let local_instance = DbInstance { let local_instance = DbInstance {
ap_id, ap_id,
articles_id,
inbox, inbox,
public_key: keypair.public_key, public_key: keypair.public_key,
private_key: Some(keypair.private_key), private_key: Some(keypair.private_key),

View File

@ -20,7 +20,7 @@ pub struct DbArticle {
pub local: bool, pub local: bool,
} }
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Article { pub struct Article {
#[serde(rename = "type")] #[serde(rename = "type")]

View File

@ -0,0 +1,93 @@
use crate::database::DatabaseHandle;
use crate::error::Error;
use crate::federation::objects::article::{Article, DbArticle};
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_object_id;
use activitypub_federation::kinds::collection::CollectionType;
use activitypub_federation::{
config::Data,
traits::{Collection, Object},
};
use futures::future;
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ArticleCollection {
pub(crate) r#type: CollectionType,
pub(crate) id: Url,
pub(crate) total_items: i32,
pub(crate) items: Vec<Article>,
}
#[derive(Clone, Debug)]
pub struct DbArticleCollection(Vec<DbArticle>);
#[async_trait::async_trait]
impl Collection for DbArticleCollection {
type Owner = DbInstance;
type DataType = DatabaseHandle;
type Kind = ArticleCollection;
type Error = Error;
async fn read_local(
_owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> {
let local_articles = {
let articles = data.articles.lock().unwrap();
articles
.iter()
.filter(|a| a.local)
.clone()
.cloned()
.collect::<Vec<_>>()
};
let articles = future::try_join_all(
local_articles
.into_iter()
.map(|a| a.into_json(data))
.collect::<Vec<_>>(),
)
.await?;
let ap_id = generate_object_id(data.local_instance().ap_id.inner())?.into();
let collection = ArticleCollection {
r#type: Default::default(),
id: ap_id,
total_items: articles.len() as i32,
items: articles,
};
Ok(collection)
}
async fn verify(
_apub: &Self::Kind,
_expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
Ok(())
}
async fn from_json(
apub: Self::Kind,
_owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
let mut articles = try_join_all(
apub.items
.into_iter()
.map(|i| DbArticle::from_json(i, data)),
)
.await?;
let mut lock = data.articles.lock().unwrap();
// TODO: need to overwrite existing items
lock.append(&mut articles);
// TODO: return value propably not needed
Ok(DbArticleCollection(articles))
}
}

View File

@ -1,5 +1,7 @@
use crate::error::Error; use crate::error::Error;
use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection};
use crate::{database::DatabaseHandle, federation::activities::follow::Follow}; use crate::{database::DatabaseHandle, federation::activities::follow::Follow};
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::kinds::actor::ServiceType; use activitypub_federation::kinds::actor::ServiceType;
use activitypub_federation::{ use activitypub_federation::{
activity_queue::send_activity, activity_queue::send_activity,
@ -16,6 +18,7 @@ use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbInstance { pub struct DbInstance {
pub ap_id: ObjectId<DbInstance>, pub ap_id: ObjectId<DbInstance>,
pub articles_id: CollectionId<DbArticleCollection>,
pub inbox: Url, pub inbox: Url,
pub(crate) public_key: String, pub(crate) public_key: String,
pub(crate) private_key: Option<String>, pub(crate) private_key: Option<String>,
@ -31,6 +34,7 @@ pub struct Instance {
#[serde(rename = "type")] #[serde(rename = "type")]
kind: ServiceType, kind: ServiceType,
id: ObjectId<DbInstance>, id: ObjectId<DbInstance>,
articles: CollectionId<DbArticleCollection>,
inbox: Url, inbox: Url,
public_key: PublicKey, public_key: PublicKey,
} }
@ -97,6 +101,7 @@ impl Object for DbInstance {
Ok(Instance { Ok(Instance {
kind: Default::default(), kind: Default::default(),
id: self.ap_id.clone(), id: self.ap_id.clone(),
articles: self.articles_id.clone(),
inbox: self.inbox.clone(), inbox: self.inbox.clone(),
public_key: self.public_key(), public_key: self.public_key(),
}) })
@ -114,6 +119,7 @@ impl Object for DbInstance {
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let instance = DbInstance { let instance = DbInstance {
ap_id: json.id, ap_id: json.id,
articles_id: json.articles,
inbox: json.inbox, inbox: json.inbox,
public_key: json.public_key.public_key_pem, public_key: json.public_key.public_key_pem,
private_key: None, private_key: None,
@ -122,6 +128,8 @@ impl Object for DbInstance {
follows: vec![], follows: vec![],
local: false, local: false,
}; };
// TODO: very inefficient to sync all articles every time
instance.articles_id.dereference(&instance, &data).await?;
let mut mutex = data.instances.lock().unwrap(); let mut mutex = data.instances.lock().unwrap();
mutex.push(instance.clone()); mutex.push(instance.clone());
Ok(instance) Ok(instance)

View File

@ -1,2 +1,3 @@
pub mod article; pub mod article;
pub mod articles_collection;
pub mod instance; pub mod instance;

View File

@ -8,9 +8,10 @@ use activitypub_federation::axum::inbox::{receive_activity, ActivityData};
use activitypub_federation::axum::json::FederationJson; use activitypub_federation::axum::json::FederationJson;
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
use activitypub_federation::protocol::context::WithContext; use activitypub_federation::protocol::context::WithContext;
use activitypub_federation::traits::ActivityHandler;
use activitypub_federation::traits::Object; use activitypub_federation::traits::Object;
use activitypub_federation::traits::{ActivityHandler, Collection};
use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection};
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::Router; use axum::Router;
@ -21,6 +22,7 @@ use url::Url;
pub fn federation_routes() -> Router { pub fn federation_routes() -> Router {
Router::new() Router::new()
.route("/", get(http_get_instance)) .route("/", get(http_get_instance))
.route("/articles", get(http_get_articles))
.route("/inbox", post(http_post_inbox)) .route("/inbox", post(http_post_inbox))
} }
@ -33,6 +35,14 @@ async fn http_get_instance(
Ok(FederationJson(WithContext::new_default(json_instance))) Ok(FederationJson(WithContext::new_default(json_instance)))
} }
#[debug_handler]
async fn http_get_articles(
data: Data<DatabaseHandle>,
) -> MyResult<FederationJson<WithContext<ArticleCollection>>> {
let collection = DbArticleCollection::read_local(&data.local_instance(), &data).await?;
Ok(FederationJson(WithContext::new_default(collection)))
}
/// List of all activities which this actor can receive. /// List of all activities which this actor can receive.
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)] #[serde(untagged)]

View File

@ -97,3 +97,60 @@ async fn test_follow_instance() -> MyResult<()> {
handle_beta.abort(); handle_beta.abort();
Ok(()) Ok(())
} }
#[tokio::test]
#[serial]
async fn test_synchronize_articles() -> MyResult<()> {
setup();
let hostname_alpha = "localhost:8131";
let hostname_beta = "localhost:8132";
let handle_alpha = tokio::task::spawn(async {
start(hostname_alpha).await.unwrap();
});
let handle_beta = tokio::task::spawn(async {
start(hostname_beta).await.unwrap();
});
// create article on alpha
let create_article = CreateArticle {
title: "Manu_Chao".to_string(),
text: "Lorem ipsum".to_string(),
};
let create_res: DbArticle = post(hostname_alpha, "article", &create_article).await?;
assert_eq!(create_article.title, create_res.title);
assert!(create_res.local);
// article is not yet on beta
let get_article = GetArticle {
title: "Manu_Chao".to_string(),
};
let get_res = get_query::<DbArticle, _>(
hostname_beta,
&format!("article"),
Some(get_article.clone()),
)
.await;
assert!(get_res.is_err());
// fetch alpha instance on beta, articles are also fetched automatically
let resolve_object = ResolveObject {
id: Url::parse(&format!("http://{hostname_alpha}"))?,
};
get_query::<DbInstance, _>(hostname_beta, "resolve_object", Some(resolve_object)).await?;
// get the article and compare
let get_res: DbArticle = get_query(
hostname_beta,
&format!("article"),
Some(get_article.clone()),
)
.await?;
assert_eq!(create_res.ap_id, get_res.ap_id);
assert_eq!(create_article.title, get_res.title);
assert_eq!(create_article.text, get_res.text);
assert!(!get_res.local);
handle_alpha.abort();
handle_beta.abort();
Ok(())
}