Merge branch 'add-diesel'

This commit is contained in:
Felix Ableitner 2023-12-05 12:08:46 +01:00
commit 132d9c8389
37 changed files with 1698 additions and 942 deletions

228
Cargo.lock generated
View File

@ -5,7 +5,7 @@ version = 3
[[package]] [[package]]
name = "activitypub_federation" name = "activitypub_federation"
version = "0.5.0-beta.5" version = "0.5.0-beta.5"
source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=parse-impl#2aa64ad1de7943840677f4b96a20a11d38e2be56" source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=diesel-feature#9ffdadfc8df6719542861466234a7dac2f9707c9"
dependencies = [ dependencies = [
"activitystreams-kinds", "activitystreams-kinds",
"async-trait", "async-trait",
@ -14,6 +14,7 @@ dependencies = [
"bytes", "bytes",
"chrono", "chrono",
"derive_builder", "derive_builder",
"diesel",
"dyn-clone", "dyn-clone",
"enum_delegate", "enum_delegate",
"futures", "futures",
@ -264,6 +265,12 @@ version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205" checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.5.0" version = "1.5.0"
@ -442,19 +449,6 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.2",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "derive_builder" name = "derive_builder"
version = "0.12.0" version = "0.12.0"
@ -486,6 +480,70 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "diesel"
version = "2.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62c6fcf842f17f8c78ecf7c81d75c5ce84436b41ee07e03f490fbb5f5a8731d8"
dependencies = [
"bitflags 2.4.1",
"byteorder",
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
"uuid",
]
[[package]]
name = "diesel-derive-newtype"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7267437d5b12df60ae29bd97f8d120f1c3a6272d6f213551afa56bbb2ecfbb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "diesel_derives"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef8337737574f55a468005a83499da720f20c65586241ffea339db9ecdfd2b44"
dependencies = [
"diesel_table_macro_syntax",
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "diesel_migrations"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6036b3f0120c5961381b570ee20a02432d7e2d27ea60de9578799cf9156914ac"
dependencies = [
"diesel",
"migrations_internals",
"migrations_macros",
]
[[package]]
name = "diesel_table_macro_syntax"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5"
dependencies = [
"syn 2.0.39",
]
[[package]]
name = "diff"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]] [[package]]
name = "diffy" name = "diffy"
version = "0.3.0" version = "0.3.0"
@ -615,20 +673,24 @@ dependencies = [
"axum", "axum",
"axum-macros", "axum-macros",
"chrono", "chrono",
"diesel",
"diesel-derive-newtype",
"diesel_migrations",
"diffy", "diffy",
"enum_delegate", "enum_delegate",
"env_logger", "env_logger",
"futures", "futures",
"hex",
"once_cell", "once_cell",
"pretty_assertions",
"rand", "rand",
"reqwest", "reqwest",
"serde", "serde",
"serde_json",
"serial_test",
"sha2", "sha2",
"tokio", "tokio",
"tracing", "tracing",
"url", "url",
"uuid",
] ]
[[package]] [[package]]
@ -865,6 +927,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.11" version = "0.2.11"
@ -1140,6 +1208,27 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "migrations_internals"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f23f71580015254b020e856feac3df5878c2c7a8812297edd6c0a485ac9dada"
dependencies = [
"serde",
"toml",
]
[[package]]
name = "migrations_macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cce3325ac70e67bbab5bd837a31cae01f1a6db64e0e744a33cb03a543469ef08"
dependencies = [
"migrations_internals",
"proc-macro2",
"quote",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@ -1408,6 +1497,25 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
dependencies = [
"vcpkg",
]
[[package]]
name = "pretty_assertions"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66"
dependencies = [
"diff",
"yansi",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.69" version = "1.0.69"
@ -1747,6 +1855,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_spanned"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.1" version = "0.7.1"
@ -1759,31 +1876,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serial_test"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d"
dependencies = [
"dashmap",
"futures",
"lazy_static",
"log",
"parking_lot",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]] [[package]]
name = "sha1" name = "sha1"
version = "0.10.6" version = "0.10.6"
@ -2037,6 +2129,40 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "toml"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.1.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@ -2159,11 +2285,12 @@ dependencies = [
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.5.0" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde",
] ]
[[package]] [[package]]
@ -2404,6 +2531,15 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winnow"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "winreg" name = "winreg"
version = "0.50.0" version = "0.50.0"
@ -2413,3 +2549,9 @@ dependencies = [
"cfg-if", "cfg-if",
"windows-sys", "windows-sys",
] ]
[[package]]
name = "yansi"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"

View File

@ -4,25 +4,29 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
activitypub_federation = { git = "https://github.com/LemmyNet/activitypub-federation-rust.git", branch = "parse-impl", features = ["axum"], default-features = false } activitypub_federation = { git = "https://github.com/LemmyNet/activitypub-federation-rust.git", branch = "diesel-feature", features = ["axum", "diesel"], default-features = false }
anyhow = "1.0.75" anyhow = "1.0.75"
async-trait = "0.1.74" async-trait = "0.1.74"
axum = "0.6.20" axum = "0.6.20"
axum-macros = "0.3.8" axum-macros = "0.3.8"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
diesel = {version = "2.1.4", features = ["postgres", "chrono", "uuid"] }
diesel-derive-newtype = "2.1.0"
diesel_migrations = "2.1.0"
diffy = "0.3.0" diffy = "0.3.0"
enum_delegate = "0.2.0" enum_delegate = "0.2.0"
env_logger = { version = "0.10.1", default-features = false } env_logger = { version = "0.10.1", default-features = false }
futures = "0.3.29" futures = "0.3.29"
hex = "0.4.3"
rand = "0.8.5" rand = "0.8.5"
serde = "1.0.192" serde = "1.0.192"
serde_json = "1.0.108"
sha2 = "0.10.8" sha2 = "0.10.8"
tokio = { version = "1.34.0", features = ["full"] } tokio = { version = "1.34.0", features = ["full"] }
tracing = "0.1.40" tracing = "0.1.40"
url = "2.4.1" url = "2.4.1"
uuid = { version = "1.6.1", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
once_cell = "1.18.0" once_cell = "1.18.0"
pretty_assertions = "1.4.0"
reqwest = "0.11.22" reqwest = "0.11.22"
serial_test = "2.0.0"

9
diesel.toml Normal file
View File

@ -0,0 +1,9 @@
# For documentation on how to configure this file,
# see https://diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/database/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId"]
[migrations_directory]
dir = "migrations"

View File

@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View File

@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,5 @@
drop table conflict;
drop table edit;
drop table article;
drop table instance_follow;
drop table instance;

View File

@ -0,0 +1,43 @@
create table instance (
id serial primary key,
ap_id varchar(255) not null unique,
inbox_url text not null,
articles_url varchar(255) not null unique,
public_key text not null,
private_key text,
last_refreshed_at timestamptz not null default now(),
local bool not null
);
create table instance_follow (
id serial primary key,
instance_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL,
follower_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL,
pending boolean not null,
unique(instance_id, follower_id)
);
create table article (
id serial primary key,
title text not null,
text text not null,
ap_id varchar(255) not null unique,
instance_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL,
local bool not null
);
create table edit (
id serial primary key,
hash uuid not null,
ap_id varchar(255) not null unique,
diff text not null,
article_id int REFERENCES article ON UPDATE CASCADE ON DELETE CASCADE NOT NULL,
previous_version_id uuid not null
);
create table conflict (
id uuid primary key,
diff text not null,
article_id int REFERENCES article ON UPDATE CASCADE ON DELETE CASCADE NOT NULL,
previous_version_id uuid not null
);

View File

@ -1,21 +1,22 @@
use crate::database::{DatabaseHandle, DbConflict}; use crate::database::article::{ArticleView, DbArticle, DbArticleForm};
use crate::database::conflict::{ApiConflict, DbConflict, DbConflictForm};
use crate::database::edit::{DbEdit, DbEditForm};
use crate::database::instance::{DbInstance, InstanceView};
use crate::database::version::EditVersion;
use crate::database::MyDataHandle;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::activities::create_article::CreateArticle; use crate::federation::activities::create_article::CreateArticle;
use crate::federation::activities::follow::Follow;
use crate::federation::activities::submit_article_update; use crate::federation::activities::submit_article_update;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::edit::EditVersion;
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_article_version; use crate::utils::generate_article_version;
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
use activitypub_federation::fetch::object_id::ObjectId; use activitypub_federation::fetch::object_id::ObjectId;
use anyhow::anyhow;
use axum::extract::Query; use axum::extract::Query;
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::{Form, Json, Router}; use axum::{Form, Json, Router};
use axum_macros::debug_handler; use axum_macros::debug_handler;
use diffy::create_patch; use diffy::create_patch;
use futures::future::try_join_all; use futures::future::try_join_all;
use rand::random;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -42,65 +43,42 @@ pub struct CreateArticleData {
/// Create a new article with empty text, and federate it to followers. /// Create a new article with empty text, and federate it to followers.
#[debug_handler] #[debug_handler]
async fn create_article( async fn create_article(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
Form(create_article): Form<CreateArticleData>, Form(create_article): Form<CreateArticleData>,
) -> MyResult<Json<DbArticle>> { ) -> MyResult<Json<ArticleView>> {
{ let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let articles = data.articles.lock().unwrap();
let title_exists = articles
.iter()
.any(|a| a.1.local && a.1.title == create_article.title);
if title_exists {
return Err(anyhow!("A local article with this title already exists").into());
}
}
let local_instance_id = data.local_instance().ap_id;
let ap_id = ObjectId::parse(&format!( let ap_id = ObjectId::parse(&format!(
"http://{}:{}/article/{}", "http://{}:{}/article/{}",
local_instance_id.inner().domain().unwrap(), local_instance.ap_id.inner().domain().unwrap(),
local_instance_id.inner().port().unwrap(), local_instance.ap_id.inner().port().unwrap(),
create_article.title create_article.title
))?; ))?;
let article = DbArticle { let form = DbArticleForm {
title: create_article.title, title: create_article.title,
text: String::new(), text: String::new(),
ap_id, ap_id,
latest_version: EditVersion::default(), instance_id: local_instance.id,
edits: vec![],
instance: local_instance_id,
local: true, local: true,
}; };
{ let article = DbArticle::create(&form, &data.db_connection)?;
let mut articles = data.articles.lock().unwrap();
articles.insert(article.ap_id.inner().clone(), article.clone());
}
CreateArticle::send_to_followers(article.clone(), &data).await?; CreateArticle::send_to_followers(article.clone(), &data).await?;
Ok(Json(article)) Ok(Json(DbArticle::read_view(article.id, &data.db_connection)?))
} }
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]
pub struct EditArticleData { pub struct EditArticleData {
/// Id of the article to edit /// Id of the article to edit
pub ap_id: ObjectId<DbArticle>, pub article_id: i32,
/// Full, new text of the article. A diff against `previous_version` is generated on the server /// Full, new text of the article. A diff against `previous_version` is generated on the server
/// side to handle conflicts. /// side to handle conflicts.
pub new_text: String, pub new_text: String,
/// The version that this edit is based on, ie [DbArticle.latest_version] or /// The version that this edit is based on, ie [DbArticle.latest_version] or
/// [ApiConflict.previous_version] /// [ApiConflict.previous_version]
pub previous_version: EditVersion, pub previous_version_id: EditVersion,
/// If you are resolving a conflict, pass the id to delete conflict from the database /// If you are resolving a conflict, pass the id to delete conflict from the database
pub resolve_conflict_id: Option<i32>, pub resolve_conflict_id: Option<EditVersion>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ApiConflict {
pub id: i32,
pub three_way_merge: String,
pub article_id: ObjectId<DbArticle>,
pub previous_version: EditVersion,
} }
/// Edit an existing article (local or remote). /// Edit an existing article (local or remote).
@ -114,67 +92,59 @@ pub struct ApiConflict {
/// Conflicts are stored in the database so they can be retrieved later from `/api/v3/edit_conflicts`. /// Conflicts are stored in the database so they can be retrieved later from `/api/v3/edit_conflicts`.
#[debug_handler] #[debug_handler]
async fn edit_article( async fn edit_article(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
Form(edit_form): Form<EditArticleData>, Form(edit_form): Form<EditArticleData>,
) -> MyResult<Json<Option<ApiConflict>>> { ) -> MyResult<Json<Option<ApiConflict>>> {
// resolve conflict if any // resolve conflict if any
if let Some(resolve_conflict_id) = &edit_form.resolve_conflict_id { if let Some(resolve_conflict_id) = edit_form.resolve_conflict_id {
let mut lock = data.conflicts.lock().unwrap(); DbConflict::delete(resolve_conflict_id, &data.db_connection)?;
if !lock.iter().any(|c| &c.id == resolve_conflict_id) {
return Err(anyhow!("invalid resolve conflict"))?;
}
lock.retain(|c| &c.id != resolve_conflict_id);
} }
let original_article = { let original_article = DbArticle::read_view(edit_form.article_id, &data.db_connection)?;
let lock = data.articles.lock().unwrap();
let article = lock.get(edit_form.ap_id.inner()).unwrap();
article.clone()
};
if edit_form.previous_version == original_article.latest_version { if edit_form.previous_version_id == original_article.latest_version {
// No intermediate changes, simply submit new version // No intermediate changes, simply submit new version
submit_article_update(&data, edit_form.new_text.clone(), &original_article).await?; submit_article_update(
&data,
edit_form.new_text.clone(),
edit_form.previous_version_id,
&original_article.article,
)
.await?;
Ok(Json(None)) Ok(Json(None))
} else { } else {
// There have been other changes since this edit was initiated. Get the common ancestor // There have been other changes since this edit was initiated. Get the common ancestor
// version and generate a diff to find out what exactly has changed. // version and generate a diff to find out what exactly has changed.
let ancestor = let ancestor =
generate_article_version(&original_article.edits, &edit_form.previous_version)?; generate_article_version(&original_article.edits, &edit_form.previous_version_id)?;
let patch = create_patch(&ancestor, &edit_form.new_text); let patch = create_patch(&ancestor, &edit_form.new_text);
let db_conflict = DbConflict { let previous_version = DbEdit::read(&edit_form.previous_version_id, &data.db_connection)?;
id: random(), let form = DbConflictForm {
id: EditVersion::new(&patch.to_string())?,
diff: patch.to_string(), diff: patch.to_string(),
article_id: original_article.ap_id.clone(), article_id: original_article.article.id,
previous_version: edit_form.previous_version, previous_version_id: previous_version.hash,
}; };
{ let conflict = DbConflict::create(&form, &data.db_connection)?;
let mut lock = data.conflicts.lock().unwrap(); Ok(Json(conflict.to_api_conflict(&data).await?))
lock.push(db_conflict.clone());
}
Ok(Json(db_conflict.to_api_conflict(&data).await?))
} }
} }
#[derive(Deserialize, Serialize, Clone)] #[derive(Deserialize, Serialize, Clone)]
pub struct GetArticleData { pub struct GetArticleData {
pub ap_id: ObjectId<DbArticle>, pub article_id: i32,
} }
/// Retrieve an article by ID. It must already be stored in the local database. /// Retrieve an article by ID. It must already be stored in the local database.
#[debug_handler] #[debug_handler]
async fn get_article( async fn get_article(
Query(query): Query<GetArticleData>, Query(query): Query<GetArticleData>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<Json<DbArticle>> { ) -> MyResult<Json<ArticleView>> {
let articles = data.articles.lock().unwrap(); Ok(Json(DbArticle::read_view(
let article = articles query.article_id,
.iter() &data.db_connection,
.find(|a| a.1.ap_id == query.ap_id) )?))
.ok_or(anyhow!("not found"))?
.1
.clone();
Ok(Json(article))
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
@ -187,7 +157,7 @@ pub struct ResolveObject {
#[debug_handler] #[debug_handler]
async fn resolve_instance( async fn resolve_instance(
Query(query): Query<ResolveObject>, Query(query): Query<ResolveObject>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<Json<DbInstance>> { ) -> MyResult<Json<DbInstance>> {
let instance: DbInstance = ObjectId::from(query.id).dereference(&data).await?; let instance: DbInstance = ObjectId::from(query.id).dereference(&data).await?;
Ok(Json(instance)) Ok(Json(instance))
@ -198,39 +168,50 @@ async fn resolve_instance(
#[debug_handler] #[debug_handler]
async fn resolve_article( async fn resolve_article(
Query(query): Query<ResolveObject>, Query(query): Query<ResolveObject>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<Json<DbArticle>> { ) -> MyResult<Json<ArticleView>> {
let article: DbArticle = ObjectId::from(query.id).dereference(&data).await?; let article: DbArticle = ObjectId::from(query.id).dereference(&data).await?;
Ok(Json(article)) let edits = DbEdit::read_for_article(&article, &data.db_connection)?;
let latest_version = edits.last().unwrap().hash.clone();
Ok(Json(ArticleView {
article,
edits,
latest_version,
}))
} }
/// Retrieve the local instance info. /// Retrieve the local instance info.
#[debug_handler] #[debug_handler]
async fn get_local_instance(data: Data<DatabaseHandle>) -> MyResult<Json<DbInstance>> { async fn get_local_instance(data: Data<MyDataHandle>) -> MyResult<Json<InstanceView>> {
Ok(Json(data.local_instance())) let local_instance = DbInstance::read_local_view(&data.db_connection)?;
Ok(Json(local_instance))
} }
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]
pub struct FollowInstance { pub struct FollowInstance {
pub instance_id: ObjectId<DbInstance>, pub id: i32,
} }
/// Make the local instance follow a given remote instance, to receive activities about new and /// Make the local instance follow a given remote instance, to receive activities about new and
/// updated articles. /// updated articles.
#[debug_handler] #[debug_handler]
async fn follow_instance( async fn follow_instance(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
Form(query): Form<FollowInstance>, Form(query): Form<FollowInstance>,
) -> MyResult<()> { ) -> MyResult<()> {
let instance = query.instance_id.dereference(&data).await?; let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
data.local_instance().follow(&instance, &data).await?; let target = DbInstance::read(query.id, &data.db_connection)?;
let pending = !target.local;
DbInstance::follow(local_instance.id, target.id, pending, &data)?;
let instance = DbInstance::read(query.id, &data.db_connection)?;
Follow::send(local_instance, instance, &data).await?;
Ok(()) Ok(())
} }
/// Get a list of all unresolved edit conflicts. /// Get a list of all unresolved edit conflicts.
#[debug_handler] #[debug_handler]
async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<ApiConflict>>> { async fn edit_conflicts(data: Data<MyDataHandle>) -> MyResult<Json<Vec<ApiConflict>>> {
let conflicts = { data.conflicts.lock().unwrap().to_vec() }; let conflicts = DbConflict::list(&data.db_connection)?;
let conflicts: Vec<ApiConflict> = try_join_all(conflicts.into_iter().map(|c| { let conflicts: Vec<ApiConflict> = try_join_all(conflicts.into_iter().map(|c| {
let data = data.reset_request_count(); let data = data.reset_request_count();
async move { c.to_api_conflict(&data).await } async move { c.to_api_conflict(&data).await }
@ -244,24 +225,16 @@ async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<ApiConf
#[derive(Deserialize, Serialize, Clone)] #[derive(Deserialize, Serialize, Clone)]
pub struct SearchArticleData { pub struct SearchArticleData {
pub title: String, pub query: String,
} }
/// Search articles by title. For now only checks exact match. /// Search articles for matching title or body text.
///
/// Later include partial title match and body search.
#[debug_handler] #[debug_handler]
async fn search_article( async fn search_article(
Query(query): Query<SearchArticleData>, Query(query): Query<SearchArticleData>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<Json<Vec<DbArticle>>> { ) -> MyResult<Json<Vec<DbArticle>>> {
let articles = data.articles.lock().unwrap(); let article = DbArticle::search(&query.query, &data.db_connection)?;
let article = articles
.iter()
.filter(|a| a.1.title == query.title)
.map(|a| a.1)
.cloned()
.collect();
Ok(Json(article)) Ok(Json(article))
} }
@ -270,54 +243,52 @@ pub struct ForkArticleData {
// TODO: could add optional param new_title so there is no problem with title collision // TODO: could add optional param new_title so there is no problem with title collision
// in case local article with same title exists. however that makes it harder to discover // in case local article with same title exists. however that makes it harder to discover
// variants of same article. // variants of same article.
pub ap_id: ObjectId<DbArticle>, pub article_id: i32,
} }
/// Fork a remote article to local instance. This is useful if there are disagreements about /// Fork a remote article to local instance. This is useful if there are disagreements about
/// how an article should be edited. /// how an article should be edited.
#[debug_handler] #[debug_handler]
async fn fork_article( async fn fork_article(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
Form(fork_form): Form<ForkArticleData>, Form(fork_form): Form<ForkArticleData>,
) -> MyResult<Json<DbArticle>> { ) -> MyResult<Json<ArticleView>> {
let article = { // TODO: lots of code duplicated from create_article(), can move it into helper
let lock = data.articles.lock().unwrap(); let original_article = DbArticle::read(fork_form.article_id, &data.db_connection)?;
let article = lock.get(fork_form.ap_id.inner()).unwrap();
article.clone()
};
if article.local {
return Err(anyhow!("Cannot fork local article because there cant be multiple local articles with same title").into());
}
let original_article = { let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let lock = data.articles.lock().unwrap();
lock.get(fork_form.ap_id.inner())
.expect("article exists")
.clone()
};
let local_instance_id = data.local_instance().ap_id;
let ap_id = ObjectId::parse(&format!( let ap_id = ObjectId::parse(&format!(
"http://{}:{}/article/{}", "http://{}:{}/article/{}",
local_instance_id.inner().domain().unwrap(), local_instance.ap_id.inner().domain().unwrap(),
local_instance_id.inner().port().unwrap(), local_instance.ap_id.inner().port().unwrap(),
original_article.title original_article.title
))?; ))?;
let forked_article = DbArticle { let form = DbArticleForm {
title: original_article.title.clone(), title: original_article.title.clone(),
text: original_article.text.clone(), text: original_article.text.clone(),
ap_id, ap_id,
latest_version: original_article.latest_version.clone(), instance_id: local_instance.id,
edits: original_article.edits.clone(),
instance: local_instance_id,
local: true, local: true,
}; };
{ let article = DbArticle::create(&form, &data.db_connection)?;
let mut articles = data.articles.lock().unwrap();
articles.insert(forked_article.ap_id.inner().clone(), forked_article.clone()); // copy edits to new article
// TODO: convert to sql
let edits = DbEdit::read_for_article(&original_article, &data.db_connection)?;
for e in edits {
let ap_id = DbEditForm::generate_ap_id(&article, &e.hash)?;
// TODO: id gives db unique violation
let form = DbEditForm {
ap_id,
diff: e.diff,
article_id: article.id,
hash: e.hash,
previous_version_id: e.previous_version_id,
};
dbg!(DbEdit::create(&form, &data.db_connection))?;
} }
CreateArticle::send_to_followers(forked_article.clone(), &data).await?; CreateArticle::send_to_followers(article.clone(), &data).await?;
Ok(Json(forked_article)) Ok(Json(DbArticle::read_view(article.id, &data.db_connection)?))
} }

View File

@ -1,76 +0,0 @@
use crate::api::ApiConflict;
use crate::error::MyResult;
use crate::federation::activities::submit_article_update;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::edit::EditVersion;
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_article_version;
use activitypub_federation::config::Data;
use activitypub_federation::fetch::object_id::ObjectId;
use diffy::{apply, merge, Patch};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use url::Url;
pub type DatabaseHandle = Arc<Database>;
pub struct Database {
pub instances: Mutex<HashMap<Url, DbInstance>>,
pub articles: Mutex<HashMap<Url, DbArticle>>,
pub conflicts: Mutex<Vec<DbConflict>>,
}
impl Database {
pub fn local_instance(&self) -> DbInstance {
let lock = self.instances.lock().unwrap();
lock.iter().find(|i| i.1.local).unwrap().1.clone()
}
}
#[derive(Clone, Debug)]
pub struct DbConflict {
pub id: i32,
pub diff: String,
pub article_id: ObjectId<DbArticle>,
pub previous_version: EditVersion,
}
impl DbConflict {
pub async fn to_api_conflict(
&self,
data: &Data<DatabaseHandle>,
) -> MyResult<Option<ApiConflict>> {
let original_article = {
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(self.article_id.inner()).unwrap();
article.clone()
};
// create common ancestor version
let ancestor = generate_article_version(&original_article.edits, &self.previous_version)?;
let patch = Patch::from_str(&self.diff)?;
// apply self.diff to ancestor to get `ours`
let ours = apply(&ancestor, &patch)?;
match merge(&ancestor, &ours, &original_article.text) {
Ok(new_text) => {
// patch applies cleanly so we are done
// federate the change
submit_article_update(data, new_text, &original_article).await?;
// remove conflict from db
let mut lock = data.conflicts.lock().unwrap();
lock.retain(|c| c.id != self.id);
Ok(None)
}
Err(three_way_merge) => {
// there is a merge conflict, user needs to do three-way-merge
Ok(Some(ApiConflict {
id: self.id,
three_way_merge,
article_id: original_article.ap_id.clone(),
previous_version: original_article.latest_version,
}))
}
}
}
}

146
src/database/article.rs Normal file
View File

@ -0,0 +1,146 @@
use crate::database::edit::DbEdit;
use crate::database::schema::article;
use crate::error::MyResult;
use crate::federation::objects::edits_collection::DbEditCollection;
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::fetch::object_id::ObjectId;
use diesel::pg::PgConnection;
use diesel::ExpressionMethods;
use diesel::{
insert_into, AsChangeset, BoolExpressionMethods, Identifiable, Insertable,
PgTextExpressionMethods, QueryDsl, Queryable, RunQueryDsl, Selectable,
};
use serde::{Deserialize, Serialize};
use crate::database::version::EditVersion;
use std::ops::DerefMut;
use std::sync::Mutex;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)]
#[diesel(table_name = article, check_for_backend(diesel::pg::Pg), belongs_to(DbInstance, foreign_key = instance_id))]
pub struct DbArticle {
pub id: i32,
pub title: String,
pub text: String,
pub ap_id: ObjectId<DbArticle>,
pub instance_id: i32,
pub local: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable)]
#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))]
pub struct ArticleView {
pub article: DbArticle,
pub latest_version: EditVersion,
pub edits: Vec<DbEdit>,
}
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))]
pub struct DbArticleForm {
pub title: String,
pub text: String,
pub ap_id: ObjectId<DbArticle>,
pub instance_id: i32,
pub local: bool,
}
impl DbArticle {
pub fn edits_id(&self) -> MyResult<CollectionId<DbEditCollection>> {
Ok(CollectionId::parse(&format!("{}/edits", self.ap_id))?)
}
pub fn create(form: &DbArticleForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(insert_into(article::table)
.values(form)
.get_result(conn.deref_mut())?)
}
pub fn create_or_update(form: &DbArticleForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(insert_into(article::table)
.values(form)
.on_conflict(article::dsl::ap_id)
.do_update()
.set(form)
.get_result(conn.deref_mut())?)
}
pub fn update_text(id: i32, text: &str, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(diesel::update(article::dsl::article.find(id))
.set(article::dsl::text.eq(text))
.get_result::<Self>(conn.deref_mut())?)
}
pub fn read(id: i32, conn: &Mutex<PgConnection>) -> MyResult<DbArticle> {
let mut conn = conn.lock().unwrap();
Ok(article::table.find(id).get_result(conn.deref_mut())?)
}
pub fn read_view(id: i32, conn: &Mutex<PgConnection>) -> MyResult<ArticleView> {
let article: DbArticle = {
let mut conn = conn.lock().unwrap();
article::table.find(id).get_result(conn.deref_mut())?
};
let latest_version = article.latest_edit_version(conn)?;
let edits: Vec<DbEdit> = DbEdit::read_for_article(&article, conn)?;
Ok(ArticleView {
article,
edits,
latest_version,
})
}
pub fn read_from_ap_id(
ap_id: &ObjectId<DbArticle>,
conn: &Mutex<PgConnection>,
) -> MyResult<DbArticle> {
let mut conn = conn.lock().unwrap();
Ok(article::table
.filter(article::dsl::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
}
pub fn read_local_title(title: &str, conn: &Mutex<PgConnection>) -> MyResult<DbArticle> {
let mut conn = conn.lock().unwrap();
Ok(article::table
.filter(article::dsl::title.eq(title))
.filter(article::dsl::local.eq(true))
.get_result(conn.deref_mut())?)
}
pub fn read_all_local(conn: &Mutex<PgConnection>) -> MyResult<Vec<DbArticle>> {
let mut conn = conn.lock().unwrap();
Ok(article::table
.filter(article::dsl::local.eq(true))
.get_results(conn.deref_mut())?)
}
pub fn search(query: &str, conn: &Mutex<PgConnection>) -> MyResult<Vec<DbArticle>> {
let mut conn = conn.lock().unwrap();
let replaced = query
.replace('%', "\\%")
.replace('_', "\\_")
.replace(' ', "%");
Ok(article::table
.filter(
article::dsl::title
.ilike(&replaced)
.or(article::dsl::text.ilike(&replaced)),
)
.get_results(conn.deref_mut())?)
}
// TODO: shouldnt have to read all edits from db
pub fn latest_edit_version(&self, conn: &Mutex<PgConnection>) -> MyResult<EditVersion> {
let edits: Vec<DbEdit> = DbEdit::read_for_article(self, conn)?;
match edits.last().map(|e| e.hash.clone()) {
Some(latest_version) => Ok(latest_version),
None => Ok(EditVersion::default()),
}
}
}

108
src/database/conflict.rs Normal file
View File

@ -0,0 +1,108 @@
use crate::database::article::DbArticle;
use crate::database::edit::DbEdit;
use crate::database::schema::conflict;
use crate::database::version::EditVersion;
use crate::database::MyDataHandle;
use crate::error::MyResult;
use crate::federation::activities::submit_article_update;
use crate::utils::generate_article_version;
use activitypub_federation::config::Data;
use diesel::{
delete, insert_into, Identifiable, Insertable, PgConnection, QueryDsl, Queryable, RunQueryDsl,
Selectable,
};
use diffy::{apply, merge, Patch};
use serde::{Deserialize, Serialize};
use std::ops::DerefMut;
use std::sync::Mutex;
/// A local only object which represents a merge conflict. It is created
/// when a local user edit conflicts with another concurrent edit.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)]
#[diesel(table_name = conflict, check_for_backend(diesel::pg::Pg), belongs_to(DbArticle, foreign_key = article_id))]
pub struct DbConflict {
pub id: EditVersion,
pub diff: String,
pub article_id: i32,
pub previous_version_id: EditVersion,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ApiConflict {
pub id: EditVersion,
pub three_way_merge: String,
pub article_id: i32,
pub previous_version_id: EditVersion,
}
#[derive(Debug, Clone, Insertable)]
#[diesel(table_name = conflict, check_for_backend(diesel::pg::Pg))]
pub struct DbConflictForm {
pub id: EditVersion,
pub diff: String,
pub article_id: i32,
pub previous_version_id: EditVersion,
}
impl DbConflict {
pub fn create(form: &DbConflictForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(insert_into(conflict::table)
.values(form)
.get_result(conn.deref_mut())?)
}
pub fn list(conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
let mut conn = conn.lock().unwrap();
Ok(conflict::table.get_results(conn.deref_mut())?)
}
/// Delete a merge conflict after it is resolved.
pub fn delete(id: EditVersion, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
// TODO: should throw error on invalid id param
Ok(delete(conflict::table.find(id)).get_result(conn.deref_mut())?)
}
pub async fn to_api_conflict(
&self,
data: &Data<MyDataHandle>,
) -> MyResult<Option<ApiConflict>> {
let article = DbArticle::read(self.article_id, &data.db_connection)?;
// Make sure to get latest version from origin so that all conflicts can be resolved
let original_article = article.ap_id.dereference_forced(data).await?;
// create common ancestor version
let edits = DbEdit::read_for_article(&original_article, &data.db_connection)?;
let ancestor = generate_article_version(&edits, &self.previous_version_id)?;
let patch = Patch::from_str(&self.diff)?;
// apply self.diff to ancestor to get `ours`
let ours = apply(&ancestor, &patch)?;
match merge(&ancestor, &ours, &original_article.text) {
Ok(new_text) => {
// patch applies cleanly so we are done
// federate the change
submit_article_update(
data,
new_text,
self.previous_version_id.clone(),
&original_article,
)
.await?;
DbConflict::delete(self.id.clone(), &data.db_connection)?;
Ok(None)
}
Err(three_way_merge) => {
// there is a merge conflict, user needs to do three-way-merge
Ok(Some(ApiConflict {
id: self.id.clone(),
three_way_merge,
article_id: original_article.id,
previous_version_id: original_article
.latest_edit_version(&data.db_connection)?,
}))
}
}
}
}

98
src/database/edit.rs Normal file
View File

@ -0,0 +1,98 @@
use crate::database::schema::edit;
use crate::database::version::EditVersion;
use crate::database::DbArticle;
use crate::error::MyResult;
use activitypub_federation::fetch::object_id::ObjectId;
use diesel::ExpressionMethods;
use diesel::{
insert_into, AsChangeset, Insertable, PgConnection, QueryDsl, Queryable, RunQueryDsl,
Selectable,
};
use diffy::create_patch;
use serde::{Deserialize, Serialize};
use std::ops::DerefMut;
use std::sync::Mutex;
/// Represents a single change to the article.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable)]
#[diesel(table_name = edit, check_for_backend(diesel::pg::Pg))]
pub struct DbEdit {
// TODO: we could use hash as primary key, but that gives errors on forking because
// the same edit is used for multiple articles
pub id: i32,
/// UUID built from sha224 hash of diff
pub hash: EditVersion,
pub ap_id: ObjectId<DbEdit>,
pub diff: String,
pub article_id: i32,
/// First edit of an article always has `EditVersion::default()` here
pub previous_version_id: EditVersion,
}
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = edit, check_for_backend(diesel::pg::Pg))]
pub struct DbEditForm {
pub hash: EditVersion,
pub ap_id: ObjectId<DbEdit>,
pub diff: String,
pub article_id: i32,
pub previous_version_id: EditVersion,
}
impl DbEditForm {
pub fn new(
original_article: &DbArticle,
updated_text: &str,
previous_version_id: EditVersion,
) -> MyResult<Self> {
let diff = create_patch(&original_article.text, updated_text);
let version = EditVersion::new(&diff.to_string())?;
let ap_id = Self::generate_ap_id(original_article, &version)?;
Ok(DbEditForm {
hash: version,
ap_id,
diff: diff.to_string(),
article_id: original_article.id,
previous_version_id,
})
}
pub(crate) fn generate_ap_id(
article: &DbArticle,
version: &EditVersion,
) -> MyResult<ObjectId<DbEdit>> {
Ok(ObjectId::parse(&format!(
"{}/{}",
article.ap_id,
version.hash()
))?)
}
}
impl DbEdit {
pub fn create(form: &DbEditForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(insert_into(edit::table)
.values(form)
.on_conflict(edit::dsl::ap_id)
.do_update()
.set(form)
.get_result(conn.deref_mut())?)
}
pub fn read(version: &EditVersion, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(edit::table
.filter(edit::dsl::hash.eq(version))
.get_result(conn.deref_mut())?)
}
pub fn read_for_article(
article: &DbArticle,
conn: &Mutex<PgConnection>,
) -> MyResult<Vec<Self>> {
let mut conn = conn.lock().unwrap();
Ok(edit::table
.filter(edit::dsl::article_id.eq(article.id))
.get_results(conn.deref_mut())?)
}
}

205
src/database/instance.rs Normal file
View File

@ -0,0 +1,205 @@
use crate::database::schema::{instance, instance_follow};
use crate::database::MyDataHandle;
use crate::error::{Error, MyResult};
use crate::federation::objects::articles_collection::DbArticleCollection;
use activitypub_federation::activity_sending::SendActivityTask;
use activitypub_federation::config::Data;
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::fetch::object_id::ObjectId;
use activitypub_federation::protocol::context::WithContext;
use activitypub_federation::traits::ActivityHandler;
use chrono::{DateTime, Utc};
use diesel::ExpressionMethods;
use diesel::{
insert_into, AsChangeset, Identifiable, Insertable, JoinOnDsl, PgConnection, QueryDsl,
Queryable, RunQueryDsl, Selectable,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::ops::DerefMut;
use std::sync::Mutex;
use tracing::warn;
use url::Url;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)]
#[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))]
pub struct DbInstance {
pub id: i32,
pub ap_id: ObjectId<DbInstance>,
pub articles_url: CollectionId<DbArticleCollection>,
pub inbox_url: String,
#[serde(skip)]
pub(crate) public_key: String,
#[serde(skip)]
pub(crate) private_key: Option<String>,
#[serde(skip)]
pub(crate) last_refreshed_at: DateTime<Utc>,
pub local: bool,
}
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))]
pub struct DbInstanceForm {
pub ap_id: ObjectId<DbInstance>,
pub articles_url: CollectionId<DbArticleCollection>,
pub inbox_url: String,
pub(crate) public_key: String,
pub(crate) private_key: Option<String>,
pub(crate) last_refreshed_at: DateTime<Utc>,
pub local: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable)]
#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))]
pub struct InstanceView {
pub instance: DbInstance,
pub followers: Vec<DbInstance>,
pub following: Vec<DbInstance>,
}
impl DbInstance {
pub fn followers_url(&self) -> MyResult<Url> {
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
}
pub fn follower_ids(&self, data: &Data<MyDataHandle>) -> MyResult<Vec<Url>> {
Ok(DbInstance::read_followers(self.id, &data.db_connection)?
.into_iter()
.map(|f| f.ap_id.into())
.collect())
}
pub async fn send_to_followers<Activity>(
&self,
activity: Activity,
extra_recipients: Vec<DbInstance>,
data: &Data<MyDataHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
<Activity as ActivityHandler>::Error: From<Error>,
{
let mut inboxes: Vec<_> = DbInstance::read_followers(self.id, &data.db_connection)?
.iter()
.map(|f| Url::parse(&f.inbox_url).unwrap())
.collect();
inboxes.extend(
extra_recipients
.into_iter()
.map(|i| Url::parse(&i.inbox_url).unwrap()),
);
self.send(activity, inboxes, data).await?;
Ok(())
}
pub async fn send<Activity>(
&self,
activity: Activity,
recipients: Vec<Url>,
data: &Data<MyDataHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
{
let activity = WithContext::new_default(activity);
let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?;
for send in sends {
let send = send.sign_and_send(data).await;
if let Err(e) = send {
warn!("Failed to send activity {:?}: {e}", activity);
}
}
Ok(())
}
pub fn create(form: &DbInstanceForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(insert_into(instance::table)
.values(form)
.on_conflict(instance::dsl::ap_id)
.do_update()
.set(form)
.get_result(conn.deref_mut())?)
}
pub fn read(id: i32, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(instance::table.find(id).get_result(conn.deref_mut())?)
}
pub fn read_from_ap_id(
ap_id: &ObjectId<DbInstance>,
data: &Data<MyDataHandle>,
) -> MyResult<DbInstance> {
let mut conn = data.db_connection.lock().unwrap();
Ok(instance::table
.filter(instance::dsl::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
}
pub fn read_local_instance(conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
Ok(instance::table
.filter(instance::dsl::local.eq(true))
.get_result(conn.deref_mut())?)
}
pub fn read_local_view(conn: &Mutex<PgConnection>) -> MyResult<InstanceView> {
let instance = DbInstance::read_local_instance(conn)?;
let followers = DbInstance::read_followers(instance.id, conn)?;
let following = DbInstance::read_following(instance.id, conn)?;
Ok(InstanceView {
instance,
followers,
following,
})
}
pub fn follow(
follower_id_: i32,
instance_id_: i32,
pending_: bool,
data: &Data<MyDataHandle>,
) -> MyResult<()> {
debug_assert_ne!(follower_id_, instance_id_);
use instance_follow::dsl::{follower_id, instance_id, pending};
let mut conn = data.db_connection.lock().unwrap();
let form = (
instance_id.eq(instance_id_),
follower_id.eq(follower_id_),
pending.eq(pending_),
);
dbg!(follower_id_, instance_id_, pending_);
insert_into(instance_follow::table)
.values(form)
.on_conflict((instance_id, follower_id))
.do_update()
.set(form)
.execute(conn.deref_mut())?;
Ok(())
}
pub fn read_followers(id_: i32, conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
use instance_follow::dsl::{follower_id, instance_id};
let mut conn = conn.lock().unwrap();
Ok(instance_follow::table
.inner_join(instance::table.on(follower_id.eq(instance::dsl::id)))
.filter(instance_id.eq(id_))
.select(instance::all_columns)
.get_results(conn.deref_mut())?)
}
pub fn read_following(id_: i32, conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
use instance_follow::dsl::{follower_id, instance_id};
let mut conn = conn.lock().unwrap();
Ok(instance_follow::table
.inner_join(instance::table.on(instance_id.eq(instance::dsl::id)))
.filter(follower_id.eq(id_))
.select(instance::all_columns)
.get_results(conn.deref_mut())?)
}
}

27
src/database/mod.rs Normal file
View File

@ -0,0 +1,27 @@
use crate::database::article::DbArticle;
use diesel::PgConnection;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
pub mod article;
pub mod conflict;
pub mod edit;
pub mod instance;
mod schema;
pub mod version;
#[derive(Clone)]
pub struct MyData {
pub db_connection: Arc<Mutex<PgConnection>>,
}
impl Deref for MyData {
type Target = Arc<Mutex<PgConnection>>;
fn deref(&self) -> &Self::Target {
&self.db_connection
}
}
pub type MyDataHandle = MyData;

64
src/database/schema.rs Normal file
View File

@ -0,0 +1,64 @@
// @generated automatically by Diesel CLI.
diesel::table! {
article (id) {
id -> Int4,
title -> Text,
text -> Text,
#[max_length = 255]
ap_id -> Varchar,
instance_id -> Int4,
local -> Bool,
}
}
diesel::table! {
conflict (id) {
id -> Uuid,
diff -> Text,
article_id -> Int4,
previous_version_id -> Uuid,
}
}
diesel::table! {
edit (id) {
id -> Int4,
hash -> Uuid,
#[max_length = 255]
ap_id -> Varchar,
diff -> Text,
article_id -> Int4,
previous_version_id -> Uuid,
}
}
diesel::table! {
instance (id) {
id -> Int4,
#[max_length = 255]
ap_id -> Varchar,
inbox_url -> Text,
#[max_length = 255]
articles_url -> Varchar,
public_key -> Text,
private_key -> Nullable<Text>,
last_refreshed_at -> Timestamptz,
local -> Bool,
}
}
diesel::table! {
instance_follow (id) {
id -> Int4,
instance_id -> Int4,
follower_id -> Int4,
pending -> Bool,
}
}
diesel::joinable!(article -> instance (instance_id));
diesel::joinable!(conflict -> article (article_id));
diesel::joinable!(edit -> article (article_id));
diesel::allow_tables_to_appear_in_same_query!(article, conflict, edit, instance, instance_follow,);

43
src/database/version.rs Normal file
View File

@ -0,0 +1,43 @@
use crate::error::MyResult;
use std::hash::Hash;
use diesel_derive_newtype::DieselNewType;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use uuid::Uuid;
/// The version hash of a specific edit. Generated by taking an SHA256 hash of the diff
/// and using the first 16 bytes so that it fits into UUID.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, DieselNewType)]
pub struct EditVersion(Uuid);
impl EditVersion {
pub fn new(diff: &str) -> MyResult<Self> {
let mut sha256 = Sha256::new();
sha256.update(diff);
let hash_bytes = sha256.finalize();
let uuid = Uuid::from_slice(&hash_bytes.as_slice()[..16])?;
Ok(EditVersion(uuid))
}
pub fn hash(&self) -> String {
hex::encode(self.0.into_bytes())
}
}
impl Default for EditVersion {
fn default() -> Self {
EditVersion::new("").unwrap()
}
}
#[test]
fn test_edit_versions() -> MyResult<()> {
let default = EditVersion::default();
assert_eq!("e3b0c44298fc1c149afbf4c8996fb924", default.hash());
let version = EditVersion::new("test")?;
assert_eq!("9f86d081884c7d659a2feaa0c55ad015", version.hash());
Ok(())
}

View File

@ -1,7 +1,7 @@
use crate::database::instance::DbInstance;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_activity_id; use crate::utils::generate_activity_id;
use crate::{database::DatabaseHandle, federation::activities::follow::Follow}; use crate::{database::MyDataHandle, federation::activities::follow::Follow};
use activitypub_federation::{ use activitypub_federation::{
config::Data, fetch::object_id::ObjectId, kinds::activity::AcceptType, traits::ActivityHandler, config::Data, fetch::object_id::ObjectId, kinds::activity::AcceptType, traits::ActivityHandler,
}; };
@ -32,7 +32,7 @@ impl Accept {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Accept { impl ActivityHandler for Accept {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -49,9 +49,9 @@ impl ActivityHandler for Accept {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// add to follows // add to follows
let mut lock = data.instances.lock().unwrap(); let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = lock.iter_mut().find(|i| i.1.local).unwrap().1; let actor = self.actor.dereference(data).await?;
local_instance.follows.push(self.actor.inner().clone()); DbInstance::follow(local_instance.id, actor.id, false, data)?;
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
use crate::database::DatabaseHandle; use crate::database::instance::DbInstance;
use crate::database::{article::DbArticle, MyDataHandle};
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::article::{ApubArticle, DbArticle}; use crate::federation::objects::article::ApubArticle;
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_activity_id; use crate::utils::generate_activity_id;
use activitypub_federation::kinds::activity::CreateType; use activitypub_federation::kinds::activity::CreateType;
use activitypub_federation::{ use activitypub_federation::{
@ -26,16 +26,14 @@ pub struct CreateArticle {
} }
impl CreateArticle { impl CreateArticle {
pub async fn send_to_followers( pub async fn send_to_followers(article: DbArticle, data: &Data<MyDataHandle>) -> MyResult<()> {
article: DbArticle, let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
data: &Data<DatabaseHandle>,
) -> MyResult<()> {
let local_instance = data.local_instance();
let object = article.clone().into_json(data).await?; let object = article.clone().into_json(data).await?;
let id = generate_activity_id(local_instance.ap_id.inner())?; let id = generate_activity_id(local_instance.ap_id.inner())?;
let to = local_instance.follower_ids(data)?;
let create = CreateArticle { let create = CreateArticle {
actor: local_instance.ap_id.clone(), actor: local_instance.ap_id.clone(),
to: local_instance.follower_ids(), to,
object, object,
kind: Default::default(), kind: Default::default(),
id, id,
@ -48,7 +46,7 @@ impl CreateArticle {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for CreateArticle { impl ActivityHandler for CreateArticle {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -66,9 +64,8 @@ impl ActivityHandler for CreateArticle {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let article = DbArticle::from_json(self.object.clone(), data).await?; let article = DbArticle::from_json(self.object.clone(), data).await?;
if article.local { if article.local {
data.local_instance() let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
.send_to_followers(self, vec![], data) local_instance.send_to_followers(self, vec![], data).await?;
.await?;
} }
Ok(()) Ok(())
} }

View File

@ -1,8 +1,6 @@
use crate::database::instance::DbInstance;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::instance::DbInstance; use crate::{database::MyDataHandle, federation::activities::accept::Accept, generate_activity_id};
use crate::{
database::DatabaseHandle, federation::activities::accept::Accept, generate_activity_id,
};
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
@ -23,20 +21,28 @@ pub struct Follow {
} }
impl Follow { impl Follow {
pub fn new(actor: ObjectId<DbInstance>, object: ObjectId<DbInstance>) -> MyResult<Follow> { pub async fn send(
let id = generate_activity_id(actor.inner())?; local_instance: DbInstance,
Ok(Follow { to: DbInstance,
actor, data: &Data<MyDataHandle>,
object, ) -> MyResult<()> {
let id = generate_activity_id(local_instance.ap_id.inner())?;
let follow = Follow {
actor: local_instance.ap_id.clone(),
object: to.ap_id.clone(),
kind: Default::default(), kind: Default::default(),
id, id,
}) };
local_instance
.send(follow, vec![to.shared_inbox_or_inbox()], data)
.await?;
Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Follow { impl ActivityHandler for Follow {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -53,13 +59,8 @@ impl ActivityHandler for Follow {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let actor = self.actor.dereference(data).await?; let actor = self.actor.dereference(data).await?;
// add to followers let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = { DbInstance::follow(actor.id, local_instance.id, false, data)?;
let mut lock = data.instances.lock().unwrap();
let local_instance = lock.iter_mut().find(|i| i.1.local).unwrap().1;
local_instance.followers.push(actor);
local_instance.clone()
};
// send back an accept // send back an accept
let follower = self.actor.dereference(data).await?; let follower = self.actor.dereference(data).await?;

View File

@ -1,9 +1,11 @@
use crate::database::DatabaseHandle; use crate::database::article::DbArticle;
use crate::database::edit::{DbEdit, DbEditForm};
use crate::database::instance::DbInstance;
use crate::database::version::EditVersion;
use crate::database::MyDataHandle;
use crate::error::Error; use crate::error::Error;
use crate::federation::activities::update_local_article::UpdateLocalArticle; use crate::federation::activities::update_local_article::UpdateLocalArticle;
use crate::federation::activities::update_remote_article::UpdateRemoteArticle; use crate::federation::activities::update_remote_article::UpdateRemoteArticle;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::edit::DbEdit;
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
pub mod accept; pub mod accept;
@ -14,29 +16,30 @@ pub mod update_local_article;
pub mod update_remote_article; pub mod update_remote_article;
pub async fn submit_article_update( pub async fn submit_article_update(
data: &Data<DatabaseHandle>, data: &Data<MyDataHandle>,
new_text: String, new_text: String,
previous_version: EditVersion,
original_article: &DbArticle, original_article: &DbArticle,
) -> Result<(), Error> { ) -> Result<(), Error> {
let edit = DbEdit::new(original_article, &new_text)?; let form = DbEditForm::new(original_article, &new_text, previous_version)?;
if original_article.local { if original_article.local {
let updated_article = { let edit = DbEdit::create(&form, &data.db_connection)?;
let mut lock = data.articles.lock().unwrap(); let updated_article =
let article = lock.get_mut(original_article.ap_id.inner()).unwrap(); DbArticle::update_text(edit.article_id, &new_text, &data.db_connection)?;
article.text = new_text;
article.latest_version = edit.version.clone();
article.edits.push(edit.clone());
article.clone()
};
UpdateLocalArticle::send(updated_article, vec![], data).await?; UpdateLocalArticle::send(updated_article, vec![], data).await?;
} else { } else {
UpdateRemoteArticle::send( // dont insert edit into db, might be invalid in case of conflict
edit, let edit = DbEdit {
original_article.instance.dereference(data).await?, id: -1,
data, hash: form.hash,
) ap_id: form.ap_id,
.await?; diff: form.diff,
article_id: form.article_id,
previous_version_id: form.previous_version_id,
};
let instance = DbInstance::read(original_article.instance_id, &data.db_connection)?;
UpdateRemoteArticle::send(edit, instance, data).await?;
} }
Ok(()) Ok(())
} }

View File

@ -1,16 +1,16 @@
use crate::database::DatabaseHandle; use crate::database::conflict::{DbConflict, DbConflictForm};
use crate::database::instance::DbInstance;
use crate::database::version::EditVersion;
use crate::database::MyDataHandle;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::edit::ApubEdit; use crate::federation::objects::edit::ApubEdit;
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_activity_id; use crate::utils::generate_activity_id;
use activitypub_federation::kinds::activity::RejectType; use activitypub_federation::kinds::activity::RejectType;
use activitypub_federation::{ use activitypub_federation::{
config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many, config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many,
traits::ActivityHandler, traits::ActivityHandler,
}; };
use rand::random;
use crate::database::DbConflict;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -30,9 +30,9 @@ impl RejectEdit {
pub async fn send( pub async fn send(
edit: ApubEdit, edit: ApubEdit,
user_instance: DbInstance, user_instance: DbInstance,
data: &Data<DatabaseHandle>, data: &Data<MyDataHandle>,
) -> MyResult<()> { ) -> MyResult<()> {
let local_instance = data.local_instance(); let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let id = generate_activity_id(local_instance.ap_id.inner())?; let id = generate_activity_id(local_instance.ap_id.inner())?;
let reject = RejectEdit { let reject = RejectEdit {
actor: local_instance.ap_id.clone(), actor: local_instance.ap_id.clone(),
@ -42,7 +42,7 @@ impl RejectEdit {
id, id,
}; };
local_instance local_instance
.send(reject, vec![user_instance.inbox], data) .send(reject, vec![Url::parse(&user_instance.inbox_url)?], data)
.await?; .await?;
Ok(()) Ok(())
} }
@ -50,7 +50,7 @@ impl RejectEdit {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for RejectEdit { impl ActivityHandler for RejectEdit {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -67,14 +67,14 @@ impl ActivityHandler for RejectEdit {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// cant convert this to DbEdit as it tries to apply patch and fails // cant convert this to DbEdit as it tries to apply patch and fails
let mut lock = data.conflicts.lock().unwrap(); let article = self.object.object.dereference(data).await?;
let conflict = DbConflict { let form = DbConflictForm {
id: random(), id: EditVersion::new(&self.object.content)?,
diff: self.object.content, diff: self.object.content,
article_id: self.object.object, article_id: article.id,
previous_version: self.object.previous_version, previous_version_id: self.object.previous_version,
}; };
lock.push(conflict); DbConflict::create(&form, &data.db_connection)?;
Ok(()) Ok(())
} }
} }

View File

@ -1,8 +1,8 @@
use crate::database::DatabaseHandle; use crate::database::{article::DbArticle, MyDataHandle};
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::article::{ApubArticle, DbArticle}; use crate::federation::objects::article::ApubArticle;
use crate::federation::objects::instance::DbInstance; use crate::database::instance::DbInstance;
use crate::utils::generate_activity_id; use crate::utils::generate_activity_id;
use activitypub_federation::kinds::activity::UpdateType; use activitypub_federation::kinds::activity::UpdateType;
use activitypub_federation::{ use activitypub_federation::{
@ -32,12 +32,12 @@ impl UpdateLocalArticle {
pub async fn send( pub async fn send(
article: DbArticle, article: DbArticle,
extra_recipients: Vec<DbInstance>, extra_recipients: Vec<DbInstance>,
data: &Data<DatabaseHandle>, data: &Data<MyDataHandle>,
) -> MyResult<()> { ) -> MyResult<()> {
debug_assert!(article.local); debug_assert!(article.local);
let local_instance = data.local_instance(); let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let id = generate_activity_id(local_instance.ap_id.inner())?; let id = generate_activity_id(local_instance.ap_id.inner())?;
let mut to = local_instance.follower_ids(); let mut to = local_instance.follower_ids(data)?;
to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone())); to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone()));
let update = UpdateLocalArticle { let update = UpdateLocalArticle {
actor: local_instance.ap_id.clone(), actor: local_instance.ap_id.clone(),
@ -55,7 +55,7 @@ impl UpdateLocalArticle {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for UpdateLocalArticle { impl ActivityHandler for UpdateLocalArticle {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {

View File

@ -1,8 +1,12 @@
use crate::database::DatabaseHandle; use crate::database::MyDataHandle;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::edit::{ApubEdit, DbEdit}; use crate::database::article::DbArticle;
use crate::federation::objects::instance::DbInstance; use crate::database::edit::DbEdit;
use crate::database::instance::DbInstance;
use crate::federation::activities::reject::RejectEdit;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use crate::federation::objects::edit::ApubEdit;
use crate::utils::generate_activity_id; use crate::utils::generate_activity_id;
use activitypub_federation::kinds::activity::UpdateType; use activitypub_federation::kinds::activity::UpdateType;
use activitypub_federation::{ use activitypub_federation::{
@ -12,9 +16,6 @@ use activitypub_federation::{
traits::{ActivityHandler, Object}, traits::{ActivityHandler, Object},
}; };
use diffy::{apply, Patch}; use diffy::{apply, Patch};
use crate::federation::activities::reject::RejectEdit;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -35,9 +36,9 @@ impl UpdateRemoteArticle {
pub async fn send( pub async fn send(
edit: DbEdit, edit: DbEdit,
article_instance: DbInstance, article_instance: DbInstance,
data: &Data<DatabaseHandle>, data: &Data<MyDataHandle>,
) -> MyResult<()> { ) -> MyResult<()> {
let local_instance = data.local_instance(); let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let id = generate_activity_id(local_instance.ap_id.inner())?; let id = generate_activity_id(local_instance.ap_id.inner())?;
let update = UpdateRemoteArticle { let update = UpdateRemoteArticle {
actor: local_instance.ap_id.clone(), actor: local_instance.ap_id.clone(),
@ -47,7 +48,7 @@ impl UpdateRemoteArticle {
id, id,
}; };
local_instance local_instance
.send(update, vec![article_instance.inbox], data) .send(update, vec![Url::parse(&article_instance.inbox_url)?], data)
.await?; .await?;
Ok(()) Ok(())
} }
@ -55,7 +56,7 @@ impl UpdateRemoteArticle {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for UpdateRemoteArticle { impl ActivityHandler for UpdateRemoteArticle {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -72,21 +73,14 @@ impl ActivityHandler for UpdateRemoteArticle {
/// Received on article origin instances /// Received on article origin instances
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let article_text = { let local_article = DbArticle::read_from_ap_id(&self.object.object, &data.db_connection)?;
let lock = data.articles.lock().unwrap();
lock.get(self.object.object.inner()).unwrap().text.clone()
};
let patch = Patch::from_str(&self.object.content)?; let patch = Patch::from_str(&self.object.content)?;
match apply(&article_text, &patch) { match apply(&local_article.text, &patch) {
Ok(applied) => { Ok(applied) => {
let article = { let edit = DbEdit::from_json(self.object.clone(), data).await?;
let edit = DbEdit::from_json(self.object.clone(), data).await?; let article =
let mut lock = data.articles.lock().unwrap(); DbArticle::update_text(edit.article_id, &applied, &data.db_connection)?;
let article = lock.get_mut(edit.article_id.inner()).unwrap();
article.text = applied;
article.clone()
};
UpdateLocalArticle::send(article, vec![self.actor.dereference(data).await?], data) UpdateLocalArticle::send(article, vec![self.actor.dereference(data).await?], data)
.await?; .await?;
} }

View File

@ -1,47 +1,3 @@
use crate::database::{Database, DatabaseHandle};
use crate::error::Error;
use crate::federation::objects::instance::DbInstance;
use activitypub_federation::config::FederationConfig;
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::http_signatures::generate_actor_keypair;
use chrono::Local;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use url::Url;
pub mod activities; pub mod activities;
pub mod objects; pub mod objects;
pub mod routes; pub mod routes;
pub async fn federation_config(hostname: &str) -> Result<FederationConfig<DatabaseHandle>, Error> {
let ap_id = Url::parse(&format!("http://{}", hostname))?.into();
let articles_id = CollectionId::parse(&format!("http://{}/all_articles", hostname))?;
let inbox = Url::parse(&format!("http://{}/inbox", hostname))?;
let keypair = generate_actor_keypair()?;
let local_instance = DbInstance {
ap_id,
articles_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
last_refreshed_at: Local::now().into(),
followers: vec![],
follows: vec![],
local: true,
};
let database = Arc::new(Database {
instances: Mutex::new(HashMap::from([(
local_instance.ap_id.inner().clone(),
local_instance,
)])),
articles: Mutex::new(HashMap::new()),
conflicts: Mutex::new(vec![]),
});
let config = FederationConfig::builder()
.domain(hostname)
.app_data(database)
.debug(true)
.build()
.await?;
Ok(config)
}

View File

@ -1,44 +1,26 @@
use crate::error::MyResult; use crate::database::article::DbArticleForm;
use crate::federation::objects::edit::{DbEdit, EditVersion}; use crate::database::instance::DbInstance;
use crate::database::version::EditVersion;
use crate::database::{article::DbArticle, MyDataHandle};
use crate::error::Error;
use crate::federation::objects::edits_collection::DbEditCollection; use crate::federation::objects::edits_collection::DbEditCollection;
use crate::federation::objects::instance::DbInstance; use activitypub_federation::config::Data;
use crate::{database::DatabaseHandle, error::Error};
use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::kinds::object::ArticleType; use activitypub_federation::kinds::object::ArticleType;
use activitypub_federation::kinds::public;
use activitypub_federation::protocol::verification::verify_domains_match;
use activitypub_federation::{ use activitypub_federation::{
config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many, traits::Object,
fetch::object_id::ObjectId,
kinds::public,
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
traits::Object,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DbArticle {
pub title: String,
pub text: String,
pub ap_id: ObjectId<DbArticle>,
pub instance: ObjectId<DbInstance>,
/// List of all edits which make up this article, oldest first.
pub edits: Vec<DbEdit>,
pub latest_version: EditVersion,
pub local: bool,
}
impl DbArticle {
fn edits_id(&self) -> MyResult<CollectionId<DbEditCollection>> {
Ok(CollectionId::parse(&format!("{}/edits", self.ap_id))?)
}
}
#[derive(Deserialize, Serialize, Debug, Clone)] #[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ApubArticle { pub struct ApubArticle {
#[serde(rename = "type")] #[serde(rename = "type")]
kind: ArticleType, pub(crate) kind: ArticleType,
id: ObjectId<DbArticle>, pub(crate) id: ObjectId<DbArticle>,
pub(crate) attributed_to: ObjectId<DbInstance>, pub(crate) attributed_to: ObjectId<DbInstance>,
#[serde(deserialize_with = "deserialize_one_or_many")] #[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>, pub(crate) to: Vec<Url>,
@ -50,7 +32,7 @@ pub struct ApubArticle {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Object for DbArticle { impl Object for DbArticle {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Kind = ApubArticle; type Kind = ApubArticle;
type Error = Error; type Error = Error;
@ -58,24 +40,19 @@ impl Object for DbArticle {
object_id: Url, object_id: Url,
data: &Data<Self::DataType>, data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
let posts = data.articles.lock().unwrap(); let article = DbArticle::read_from_ap_id(&object_id.into(), &data.db_connection).ok();
let res = posts Ok(article)
.clone()
.into_iter()
.find(|u| u.1.ap_id.inner() == &object_id)
.map(|u| u.1);
Ok(res)
} }
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let instance = self.instance.dereference_local(data).await?; let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
Ok(ApubArticle { Ok(ApubArticle {
kind: Default::default(), kind: Default::default(),
id: self.ap_id.clone(), id: self.ap_id.clone(),
attributed_to: self.instance.clone(), attributed_to: local_instance.ap_id.clone(),
to: vec![public(), instance.followers_url()?], to: vec![public(), local_instance.followers_url()?],
edits: self.edits_id()?, edits: self.edits_id()?,
latest_version: self.latest_version, latest_version: self.latest_edit_version(&data.db_connection)?,
content: self.text, content: self.text,
name: self.title, name: self.title,
}) })
@ -91,26 +68,17 @@ impl Object for DbArticle {
} }
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let mut article = DbArticle { let instance = json.attributed_to.dereference(data).await?;
let form = DbArticleForm {
title: json.name, title: json.name,
text: json.content, text: json.content,
ap_id: json.id, ap_id: json.id,
instance: json.attributed_to,
// TODO: shouldnt overwrite existing edits
edits: vec![],
latest_version: json.latest_version,
local: false, local: false,
instance_id: instance.id,
}; };
let article = DbArticle::create_or_update(&form, &data.db_connection)?;
{ json.edits.dereference(&article, data).await?;
let mut lock = data.articles.lock().unwrap();
lock.insert(article.ap_id.inner().clone(), article.clone());
}
let edits = json.edits.dereference(&article, data).await?;
// include edits in return value (they are already written to db, no need to do that here)
article.edits = edits.0;
Ok(article) Ok(article)
} }

View File

@ -1,7 +1,7 @@
use crate::database::DatabaseHandle; use crate::database::instance::DbInstance;
use crate::database::{article::DbArticle, MyDataHandle};
use crate::error::Error; use crate::error::Error;
use crate::federation::objects::article::{ApubArticle, DbArticle}; use crate::federation::objects::article::ApubArticle;
use crate::federation::objects::instance::DbInstance;
use activitypub_federation::kinds::collection::CollectionType; use activitypub_federation::kinds::collection::CollectionType;
use activitypub_federation::{ use activitypub_federation::{
@ -28,24 +28,15 @@ pub struct DbArticleCollection(Vec<DbArticle>);
#[async_trait::async_trait] #[async_trait::async_trait]
impl Collection for DbArticleCollection { impl Collection for DbArticleCollection {
type Owner = DbInstance; type Owner = DbInstance;
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Kind = ArticleCollection; type Kind = ArticleCollection;
type Error = Error; type Error = Error;
async fn read_local( async fn read_local(
_owner: &Self::Owner, owner: &Self::Owner,
data: &Data<Self::DataType>, data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> { ) -> Result<Self::Kind, Self::Error> {
let local_articles = { let local_articles = DbArticle::read_all_local(&data.db_connection)?;
let articles = data.articles.lock().unwrap();
articles
.iter()
.map(|a| a.1)
.filter(|a| a.local)
.clone()
.cloned()
.collect::<Vec<_>>()
};
let articles = future::try_join_all( let articles = future::try_join_all(
local_articles local_articles
.into_iter() .into_iter()
@ -55,7 +46,7 @@ impl Collection for DbArticleCollection {
.await?; .await?;
let collection = ArticleCollection { let collection = ArticleCollection {
r#type: Default::default(), r#type: Default::default(),
id: data.local_instance().articles_id.into(), id: owner.articles_url.clone().into(),
total_items: articles.len() as i32, total_items: articles.len() as i32,
items: articles, items: articles,
}; };

View File

@ -1,54 +1,14 @@
use crate::database::DatabaseHandle; use crate::database::article::DbArticle;
use crate::error::{Error, MyResult}; use crate::database::edit::{DbEdit, DbEditForm};
use crate::database::version::EditVersion;
use crate::federation::objects::article::DbArticle; use crate::database::MyDataHandle;
use crate::error::Error;
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
use activitypub_federation::fetch::object_id::ObjectId; use activitypub_federation::fetch::object_id::ObjectId;
use activitypub_federation::traits::Object; use activitypub_federation::traits::Object;
use diffy::create_patch;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::Digest;
use sha2::Sha224;
use url::Url; use url::Url;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EditVersion(String);
impl Default for EditVersion {
fn default() -> Self {
let sha224 = Sha224::new();
let hash = format!("{:X}", sha224.finalize());
EditVersion(hash)
}
}
/// Represents a single change to the article.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DbEdit {
pub id: ObjectId<DbEdit>,
pub diff: String,
pub article_id: ObjectId<DbArticle>,
pub version: EditVersion,
pub local: bool,
}
impl DbEdit {
pub fn new(original_article: &DbArticle, updated_text: &str) -> MyResult<Self> {
let diff = create_patch(&original_article.text, updated_text);
let mut sha224 = Sha224::new();
sha224.update(diff.to_bytes());
let hash = format!("{:X}", sha224.finalize());
let edit_id = ObjectId::parse(&format!("{}/{}", original_article.ap_id, hash))?;
Ok(DbEdit {
id: edit_id,
diff: diff.to_string(),
article_id: original_article.ap_id.clone(),
version: EditVersion(hash),
local: true,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EditType { pub enum EditType {
Edit, Edit,
@ -68,7 +28,7 @@ pub struct ApubEdit {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Object for DbEdit { impl Object for DbEdit {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Kind = ApubEdit; type Kind = ApubEdit;
type Error = Error; type Error = Error;
@ -80,18 +40,14 @@ impl Object for DbEdit {
} }
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let article_version = { let article = DbArticle::read(self.article_id, &data.db_connection)?;
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(self.article_id.inner()).unwrap();
article.latest_version.clone()
};
Ok(ApubEdit { Ok(ApubEdit {
kind: EditType::Edit, kind: EditType::Edit,
id: self.id, id: self.ap_id,
content: self.diff, content: self.diff,
version: self.version, version: self.hash,
previous_version: article_version, previous_version: self.previous_version_id,
object: self.article_id, object: article.ap_id,
}) })
} }
@ -104,16 +60,15 @@ impl Object for DbEdit {
} }
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let edit = Self { let article = json.object.dereference(data).await?;
id: json.id, let form = DbEditForm {
ap_id: json.id,
diff: json.content, diff: json.content,
article_id: json.object, article_id: article.id,
version: json.version, hash: json.version,
local: false, previous_version_id: json.previous_version,
}; };
let mut lock = data.articles.lock().unwrap(); let edit = DbEdit::create(&form, &data.db_connection)?;
let article = lock.get_mut(edit.article_id.inner()).unwrap();
article.edits.push(edit.clone());
Ok(edit) Ok(edit)
} }
} }

View File

@ -1,8 +1,10 @@
use crate::database::DatabaseHandle; use crate::database::article::DbArticle;
use crate::database::MyDataHandle;
use crate::error::Error; use crate::error::Error;
use crate::federation::objects::article::DbArticle; use crate::federation::objects::edit::ApubEdit;
use crate::federation::objects::edit::{ApubEdit, DbEdit};
use crate::database::edit::DbEdit;
use crate::database::instance::DbInstance;
use activitypub_federation::kinds::collection::OrderedCollectionType; use activitypub_federation::kinds::collection::OrderedCollectionType;
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data,
@ -28,7 +30,7 @@ pub struct DbEditCollection(pub Vec<DbEdit>);
#[async_trait::async_trait] #[async_trait::async_trait]
impl Collection for DbEditCollection { impl Collection for DbEditCollection {
type Owner = DbArticle; type Owner = DbArticle;
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Kind = ApubEditCollection; type Kind = ApubEditCollection;
type Error = Error; type Error = Error;
@ -36,22 +38,20 @@ impl Collection for DbEditCollection {
owner: &Self::Owner, owner: &Self::Owner,
data: &Data<Self::DataType>, data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> { ) -> Result<Self::Kind, Self::Error> {
let edits = { let article = DbArticle::read_view(owner.id, &data.db_connection)?;
let lock = data.articles.lock().unwrap();
DbEditCollection(lock.get(owner.ap_id.inner()).unwrap().edits.clone())
};
let edits = future::try_join_all( let edits = future::try_join_all(
edits article
.0 .edits
.into_iter() .into_iter()
.map(|a| a.into_json(data)) .map(|a| a.into_json(data))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
) )
.await?; .await?;
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let collection = ApubEditCollection { let collection = ApubEditCollection {
r#type: Default::default(), r#type: Default::default(),
id: Url::from(data.local_instance().articles_id), id: Url::from(local_instance.articles_url),
total_items: edits.len() as i32, total_items: edits.len() as i32,
items: edits, items: edits,
}; };

View File

@ -1,37 +1,20 @@
use crate::error::{Error, MyResult}; use crate::database::instance::{DbInstance, DbInstanceForm};
use crate::database::MyDataHandle;
use crate::error::Error;
use crate::federation::objects::articles_collection::DbArticleCollection; use crate::federation::objects::articles_collection::DbArticleCollection;
use crate::{database::DatabaseHandle, federation::activities::follow::Follow};
use activitypub_federation::activity_sending::SendActivityTask;
use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::kinds::actor::ServiceType; use activitypub_federation::kinds::actor::ServiceType;
use activitypub_federation::{ use activitypub_federation::{
config::Data, config::Data,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match}, protocol::{public_key::PublicKey, verification::verify_domains_match},
traits::{ActivityHandler, Actor, Object}, traits::{Actor, Object},
}; };
use chrono::{DateTime, Local, Utc}; use chrono::{DateTime, Local, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Debug; use std::fmt::Debug;
use tracing::warn;
use url::Url; use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbInstance {
pub ap_id: ObjectId<DbInstance>,
pub articles_id: CollectionId<DbArticleCollection>,
pub inbox: Url,
#[serde(skip)]
pub(crate) public_key: String,
#[serde(skip)]
pub(crate) private_key: Option<String>,
#[serde(skip)]
pub(crate) last_refreshed_at: DateTime<Utc>,
pub followers: Vec<DbInstance>,
pub follows: Vec<Url>,
pub local: bool,
}
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ApubInstance { pub struct ApubInstance {
@ -43,75 +26,9 @@ pub struct ApubInstance {
public_key: PublicKey, public_key: PublicKey,
} }
impl DbInstance {
pub fn followers_url(&self) -> MyResult<Url> {
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
}
pub fn follower_ids(&self) -> Vec<Url> {
self.followers
.iter()
.map(|f| f.ap_id.inner().clone())
.collect()
}
pub async fn follow(
&self,
other: &DbInstance,
data: &Data<DatabaseHandle>,
) -> Result<(), Error> {
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone())?;
self.send(follow, vec![other.shared_inbox_or_inbox()], data)
.await?;
Ok(())
}
pub async fn send_to_followers<Activity>(
&self,
activity: Activity,
extra_recipients: Vec<DbInstance>,
data: &Data<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
{
let local_instance = data.local_instance();
let mut inboxes: Vec<_> = local_instance
.followers
.iter()
.map(|f| f.inbox.clone())
.collect();
inboxes.extend(extra_recipients.into_iter().map(|i| i.inbox));
local_instance.send(activity, inboxes, data).await?;
Ok(())
}
pub async fn send<Activity>(
&self,
activity: Activity,
recipients: Vec<Url>,
data: &Data<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
{
let activity = WithContext::new_default(activity);
let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?;
for send in sends {
let send = send.sign_and_send(data).await;
if let Err(e) = send {
warn!("Failed to send activity {:?}: {e}", activity);
}
}
Ok(())
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl Object for DbInstance { impl Object for DbInstance {
type DataType = DatabaseHandle; type DataType = MyDataHandle;
type Kind = ApubInstance; type Kind = ApubInstance;
type Error = Error; type Error = Error;
@ -123,21 +40,15 @@ impl Object for DbInstance {
object_id: Url, object_id: Url,
data: &Data<Self::DataType>, data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
let users = data.instances.lock().unwrap(); Ok(DbInstance::read_from_ap_id(&object_id.into(), data).ok())
let res = users
.clone()
.into_iter()
.map(|u| u.1)
.find(|u| u.ap_id.inner() == &object_id);
Ok(res)
} }
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(ApubInstance { Ok(ApubInstance {
kind: Default::default(), kind: Default::default(),
id: self.ap_id.clone(), id: self.ap_id.clone(),
articles: self.articles_id.clone(), articles: self.articles_url.clone(),
inbox: self.inbox.clone(), inbox: Url::parse(&self.inbox_url)?,
public_key: self.public_key(), public_key: self.public_key(),
}) })
} }
@ -152,21 +63,18 @@ impl Object for DbInstance {
} }
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let instance = DbInstance { let form = DbInstanceForm {
ap_id: json.id, ap_id: json.id,
articles_id: json.articles, articles_url: json.articles,
inbox: json.inbox, inbox_url: json.inbox.to_string(),
public_key: json.public_key.public_key_pem, public_key: json.public_key.public_key_pem,
private_key: None, private_key: None,
last_refreshed_at: Local::now().into(), last_refreshed_at: Local::now().into(),
followers: vec![],
follows: vec![],
local: false, local: false,
}; };
let instance = DbInstance::create(&form, &data.db_connection)?;
// TODO: very inefficient to sync all articles every time // TODO: very inefficient to sync all articles every time
instance.articles_id.dereference(&instance, data).await?; instance.articles_url.dereference(&instance, data).await?;
let mut mutex = data.instances.lock().unwrap();
mutex.insert(instance.ap_id.inner().clone(), instance.clone());
Ok(instance) Ok(instance)
} }
} }
@ -185,6 +93,6 @@ impl Actor for DbInstance {
} }
fn inbox(&self) -> Url { fn inbox(&self) -> Url {
self.inbox.clone() Url::parse(&self.inbox_url).unwrap()
} }
} }

View File

@ -1,9 +1,17 @@
use crate::database::DatabaseHandle; use crate::database::article::DbArticle;
use crate::database::instance::DbInstance;
use crate::database::MyDataHandle;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::activities::accept::Accept; use crate::federation::activities::accept::Accept;
use crate::federation::activities::create_article::CreateArticle;
use crate::federation::activities::follow::Follow; use crate::federation::activities::follow::Follow;
use crate::federation::objects::instance::{ApubInstance, DbInstance}; use crate::federation::activities::reject::RejectEdit;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use crate::federation::activities::update_remote_article::UpdateRemoteArticle;
use crate::federation::objects::article::ApubArticle;
use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection};
use crate::federation::objects::edits_collection::{ApubEditCollection, DbEditCollection};
use crate::federation::objects::instance::ApubInstance;
use activitypub_federation::axum::inbox::{receive_activity, ActivityData}; use activitypub_federation::axum::inbox::{receive_activity, ActivityData};
use activitypub_federation::axum::json::FederationJson; use activitypub_federation::axum::json::FederationJson;
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
@ -11,14 +19,6 @@ use activitypub_federation::protocol::context::WithContext;
use activitypub_federation::traits::Object; use activitypub_federation::traits::Object;
use activitypub_federation::traits::{ActivityHandler, Collection}; use activitypub_federation::traits::{ActivityHandler, Collection};
use axum::extract::Path; use axum::extract::Path;
use crate::federation::activities::create_article::CreateArticle;
use crate::federation::activities::reject::RejectEdit;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use crate::federation::activities::update_remote_article::UpdateRemoteArticle;
use crate::federation::objects::article::ApubArticle;
use crate::federation::objects::articles_collection::{ArticleCollection, DbArticleCollection};
use crate::federation::objects::edits_collection::{ApubEditCollection, DbEditCollection};
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::Router; use axum::Router;
@ -37,30 +37,28 @@ pub fn federation_routes() -> Router {
#[debug_handler] #[debug_handler]
async fn http_get_instance( async fn http_get_instance(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<FederationJson<WithContext<ApubInstance>>> { ) -> MyResult<FederationJson<WithContext<ApubInstance>>> {
let db_instance = data.local_instance(); let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let json_instance = db_instance.into_json(&data).await?; let json_instance = local_instance.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json_instance))) Ok(FederationJson(WithContext::new_default(json_instance)))
} }
#[debug_handler] #[debug_handler]
async fn http_get_all_articles( async fn http_get_all_articles(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<FederationJson<WithContext<ArticleCollection>>> { ) -> MyResult<FederationJson<WithContext<ArticleCollection>>> {
let collection = DbArticleCollection::read_local(&data.local_instance(), &data).await?; let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let collection = DbArticleCollection::read_local(&local_instance, &data).await?;
Ok(FederationJson(WithContext::new_default(collection))) Ok(FederationJson(WithContext::new_default(collection)))
} }
#[debug_handler] #[debug_handler]
async fn http_get_article( async fn http_get_article(
Path(title): Path<String>, Path(title): Path<String>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<FederationJson<WithContext<ApubArticle>>> { ) -> MyResult<FederationJson<WithContext<ApubArticle>>> {
let article = { let article = DbArticle::read_local_title(&title, &data.db_connection)?;
let lock = data.articles.lock().unwrap();
lock.values().find(|a| a.title == title).unwrap().clone()
};
let json = article.into_json(&data).await?; let json = article.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json))) Ok(FederationJson(WithContext::new_default(json)))
} }
@ -68,12 +66,9 @@ async fn http_get_article(
#[debug_handler] #[debug_handler]
async fn http_get_article_edits( async fn http_get_article_edits(
Path(title): Path<String>, Path(title): Path<String>,
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
) -> MyResult<FederationJson<WithContext<ApubEditCollection>>> { ) -> MyResult<FederationJson<WithContext<ApubEditCollection>>> {
let article = { let article = DbArticle::read_local_title(&title, &data.db_connection)?;
let lock = data.articles.lock().unwrap();
lock.values().find(|a| a.title == title).unwrap().clone()
};
let json = DbEditCollection::read_local(&article, &data).await?; let json = DbEditCollection::read_local(&article, &data).await?;
Ok(FederationJson(WithContext::new_default(json))) Ok(FederationJson(WithContext::new_default(json)))
} }
@ -93,12 +88,9 @@ pub enum InboxActivities {
#[debug_handler] #[debug_handler]
pub async fn http_post_inbox( pub async fn http_post_inbox(
data: Data<DatabaseHandle>, data: Data<MyDataHandle>,
activity_data: ActivityData, activity_data: ActivityData,
) -> impl IntoResponse { ) -> impl IntoResponse {
receive_activity::<WithContext<InboxActivities>, DbInstance, DatabaseHandle>( receive_activity::<WithContext<InboxActivities>, DbInstance, MyDataHandle>(activity_data, &data)
activity_data, .await
&data,
)
.await
} }

View File

@ -1,13 +1,22 @@
use crate::utils::generate_activity_id;
use activitypub_federation::config::FederationMiddleware;
use axum::{Router, Server};
use crate::api::api_routes; use crate::api::api_routes;
use crate::database::instance::{DbInstance, DbInstanceForm};
use crate::database::MyData;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::routes::federation_routes; use crate::federation::routes::federation_routes;
use federation::federation_config; use crate::utils::generate_activity_id;
use activitypub_federation::config::{FederationConfig, FederationMiddleware};
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::fetch::object_id::ObjectId;
use activitypub_federation::http_signatures::generate_actor_keypair;
use axum::{Router, Server};
use chrono::Local;
use diesel::Connection;
use diesel::PgConnection;
use diesel_migrations::embed_migrations;
use diesel_migrations::EmbeddedMigrations;
use diesel_migrations::MigrationHarness;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex};
use tracing::info; use tracing::info;
pub mod api; pub mod api;
@ -16,8 +25,39 @@ pub mod error;
pub mod federation; pub mod federation;
mod utils; mod utils;
pub async fn start(hostname: &str) -> MyResult<()> { const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
let config = federation_config(hostname).await?;
pub async fn start(hostname: &str, database_url: &str) -> MyResult<()> {
let db_connection = Arc::new(Mutex::new(PgConnection::establish(database_url)?));
db_connection
.lock()
.unwrap()
.run_pending_migrations(MIGRATIONS)
.unwrap();
let data = MyData { db_connection };
let config = FederationConfig::builder()
.domain(hostname)
.app_data(data)
.debug(true)
.build()
.await?;
// TODO: Move this into setup api call
let ap_id = ObjectId::parse(&format!("http://{}", hostname))?;
let articles_url = CollectionId::parse(&format!("http://{}/all_articles", hostname))?;
let inbox_url = format!("http://{}/inbox", hostname);
let keypair = generate_actor_keypair()?;
let form = DbInstanceForm {
ap_id,
articles_url,
inbox_url,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
last_refreshed_at: Local::now().into(),
local: true,
};
DbInstance::create(&form, &config.db_connection)?;
info!("Listening with axum on {hostname}"); info!("Listening with axum on {hostname}");
let config = config.clone(); let config = config.clone();

View File

@ -9,6 +9,7 @@ pub async fn main() -> MyResult<()> {
.filter_module("activitypub_federation", LevelFilter::Info) .filter_module("activitypub_federation", LevelFilter::Info)
.filter_module("fediwiki", LevelFilter::Info) .filter_module("fediwiki", LevelFilter::Info)
.init(); .init();
start("localhost:8131").await?; let database_url = "postgres://fediwiki:password@localhost:5432/fediwiki";
start("localhost:8131", database_url).await?;
Ok(()) Ok(())
} }

View File

@ -1,8 +1,10 @@
use crate::database::edit::DbEdit;
use crate::database::version::EditVersion;
use crate::error::MyResult; use crate::error::MyResult;
use crate::federation::objects::edit::{DbEdit, EditVersion};
use anyhow::anyhow; use anyhow::anyhow;
use diffy::{apply, Patch}; use diffy::{apply, Patch};
use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand::{distributions::Alphanumeric, thread_rng, Rng};
use url::{ParseError, Url}; use url::{ParseError, Url};
pub fn generate_activity_id(domain: &Url) -> Result<Url, ParseError> { pub fn generate_activity_id(domain: &Url) -> Result<Url, ParseError> {
@ -23,10 +25,13 @@ pub fn generate_activity_id(domain: &Url) -> Result<Url, ParseError> {
/// TODO: should cache all these generated versions /// TODO: should cache all these generated versions
pub fn generate_article_version(edits: &Vec<DbEdit>, version: &EditVersion) -> MyResult<String> { pub fn generate_article_version(edits: &Vec<DbEdit>, version: &EditVersion) -> MyResult<String> {
let mut generated = String::new(); let mut generated = String::new();
if version == &EditVersion::default() {
return Ok(generated);
}
for e in edits { for e in edits {
let patch = Patch::from_str(&e.diff)?; let patch = Patch::from_str(&e.diff)?;
generated = apply(&generated, &patch)?; generated = apply(&generated, &patch)?;
if &e.version == version { if &e.hash == version {
return Ok(generated); return Ok(generated);
} }
} }

View File

@ -1,16 +1,22 @@
use activitypub_federation::fetch::object_id::ObjectId; use anyhow::anyhow;
use fediwiki::api::{ use fediwiki::api::{
ApiConflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject,
}; };
use fediwiki::database::article::ArticleView;
use fediwiki::database::conflict::ApiConflict;
use fediwiki::database::instance::DbInstance;
use fediwiki::error::MyResult; use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle;
use fediwiki::federation::objects::instance::DbInstance;
use fediwiki::start; use fediwiki::start;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::Client; use reqwest::{Client, RequestBuilder, StatusCode};
use serde::de::Deserialize; use serde::de::Deserialize;
use serde::ser::Serialize; use serde::ser::Serialize;
use std::env::current_dir;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Once; use std::sync::Once;
use std::thread::{sleep, spawn};
use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::log::LevelFilter; use tracing::log::LevelFilter;
use url::Url; use url::Url;
@ -18,12 +24,9 @@ use url::Url;
pub static CLIENT: Lazy<Client> = Lazy::new(Client::new); pub static CLIENT: Lazy<Client> = Lazy::new(Client::new);
pub struct TestData { pub struct TestData {
pub hostname_alpha: &'static str, pub alpha: FediwikiInstance,
pub hostname_beta: &'static str, pub beta: FediwikiInstance,
pub hostname_gamma: &'static str, pub gamma: FediwikiInstance,
handle_alpha: JoinHandle<()>,
handle_beta: JoinHandle<()>,
handle_gamma: JoinHandle<()>,
} }
impl TestData { impl TestData {
@ -37,87 +40,135 @@ impl TestData {
.init(); .init();
}); });
let hostname_alpha = "localhost:8131"; // Run things on different ports and db paths to allow parallel tests
let hostname_beta = "localhost:8132"; static COUNTER: AtomicI32 = AtomicI32::new(0);
let hostname_gamma = "localhost:8133"; let current_run = COUNTER.fetch_add(1, Ordering::Relaxed);
let handle_alpha = tokio::task::spawn(async {
start(hostname_alpha).await.unwrap(); // Give each test a moment to start its postgres databases
}); sleep(Duration::from_millis(current_run as u64 * 500));
let handle_beta = tokio::task::spawn(async {
start(hostname_beta).await.unwrap(); let first_port = 8000 + (current_run * 3);
}); let port_alpha = first_port;
let handle_gamma = tokio::task::spawn(async { let port_beta = first_port + 1;
start(hostname_gamma).await.unwrap(); let port_gamma = first_port + 2;
});
let alpha_db_path = generate_db_path("alpha", port_alpha);
let beta_db_path = generate_db_path("beta", port_beta);
let gamma_db_path = generate_db_path("gamma", port_gamma);
// initialize postgres databases in parallel because its slow
for j in [
FediwikiInstance::prepare_db(alpha_db_path.clone()),
FediwikiInstance::prepare_db(beta_db_path.clone()),
FediwikiInstance::prepare_db(gamma_db_path.clone()),
] {
j.join().unwrap();
}
Self { Self {
hostname_alpha, alpha: FediwikiInstance::start(alpha_db_path, port_alpha),
hostname_beta, beta: FediwikiInstance::start(beta_db_path, port_beta),
hostname_gamma, gamma: FediwikiInstance::start(gamma_db_path, port_gamma),
handle_alpha,
handle_beta,
handle_gamma,
} }
} }
pub fn stop(self) -> MyResult<()> { pub fn stop(self) -> MyResult<()> {
self.handle_alpha.abort(); for j in [self.alpha.stop(), self.beta.stop(), self.gamma.stop()] {
self.handle_beta.abort(); j.join().unwrap();
self.handle_gamma.abort(); }
Ok(()) Ok(())
} }
} }
/// Generate a unique db path for each postgres so that tests can run in parallel.
fn generate_db_path(name: &'static str, port: i32) -> String {
format!(
"{}/target/test_db/{name}-{port}",
current_dir().unwrap().display()
)
}
pub struct FediwikiInstance {
pub hostname: String,
db_path: String,
db_handle: JoinHandle<()>,
}
impl FediwikiInstance {
fn prepare_db(db_path: String) -> std::thread::JoinHandle<()> {
spawn(move || {
Command::new("./tests/scripts/start_dev_db.sh")
.arg(&db_path)
.stdout(Stdio::null())
.stderr(Stdio::null())
.output()
.unwrap();
})
}
fn start(db_path: String, port: i32) -> Self {
let db_url = format!("postgresql://lemmy:password@/lemmy?host={db_path}");
let hostname = format!("localhost:{port}");
let hostname_ = hostname.clone();
let handle = tokio::task::spawn(async move {
start(&hostname_, &db_url).await.unwrap();
});
Self {
db_path,
hostname,
db_handle: handle,
}
}
fn stop(self) -> std::thread::JoinHandle<()> {
self.db_handle.abort();
spawn(move || {
Command::new("./tests/scripts/stop_dev_db.sh")
.arg(&self.db_path)
.stdout(Stdio::null())
.stderr(Stdio::null())
.output()
.unwrap();
})
}
}
pub const TEST_ARTICLE_DEFAULT_TEXT: &str = "some\nexample\ntext\n"; pub const TEST_ARTICLE_DEFAULT_TEXT: &str = "some\nexample\ntext\n";
pub async fn create_article(hostname: &str, title: String) -> MyResult<DbArticle> { pub async fn create_article(hostname: &str, title: String) -> MyResult<ArticleView> {
let create_form = CreateArticleData { let create_form = CreateArticleData {
title: title.clone(), title: title.clone(),
}; };
let article: DbArticle = post(hostname, "article", &create_form).await?; let article: ArticleView = post(hostname, "article", &create_form).await?;
// create initial edit to ensure that conflicts are generated (there are no conflicts on empty file) // create initial edit to ensure that conflicts are generated (there are no conflicts on empty file)
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: article.ap_id, article_id: article.article.id,
new_text: TEST_ARTICLE_DEFAULT_TEXT.to_string(), new_text: TEST_ARTICLE_DEFAULT_TEXT.to_string(),
previous_version: article.latest_version, previous_version_id: article.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
edit_article(hostname, &edit_form).await edit_article(hostname, &edit_form).await
} }
pub async fn get_article(hostname: &str, ap_id: &ObjectId<DbArticle>) -> MyResult<DbArticle> { pub async fn get_article(hostname: &str, article_id: i32) -> MyResult<ArticleView> {
let get_article = GetArticleData { let get_article = GetArticleData { article_id };
ap_id: ap_id.clone(), get_query::<ArticleView, _>(hostname, "article", Some(get_article.clone())).await
};
get_query::<DbArticle, _>(hostname, "article", Some(get_article.clone())).await
} }
pub async fn edit_article_with_conflict( pub async fn edit_article_with_conflict(
hostname: &str, hostname: &str,
edit_form: &EditArticleData, edit_form: &EditArticleData,
) -> MyResult<Option<ApiConflict>> { ) -> MyResult<Option<ApiConflict>> {
Ok(CLIENT let req = CLIENT
.patch(format!("http://{}/api/v1/article", hostname)) .patch(format!("http://{}/api/v1/article", hostname))
.form(edit_form) .form(edit_form);
.send() handle_json_res(req).await
.await?
.json()
.await?)
} }
pub async fn edit_article(hostname: &str, edit_form: &EditArticleData) -> MyResult<DbArticle> { pub async fn edit_article(hostname: &str, edit_form: &EditArticleData) -> MyResult<ArticleView> {
let edit_res: Option<ApiConflict> = CLIENT let edit_res = edit_article_with_conflict(hostname, edit_form).await?;
.patch(format!("http://{}/api/v1/article", hostname))
.form(&edit_form)
.send()
.await?
.json()
.await?;
assert!(edit_res.is_none()); assert!(edit_res.is_none());
let get_article = GetArticleData { get_article(hostname, edit_form.article_id).await
ap_id: edit_form.ap_id.clone(),
};
let updated_article: DbArticle = get_query(hostname, "article", Some(get_article)).await?;
Ok(updated_article)
} }
pub async fn get<T>(hostname: &str, endpoint: &str) -> MyResult<T> pub async fn get<T>(hostname: &str, endpoint: &str) -> MyResult<T>
@ -132,42 +183,51 @@ where
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
R: Serialize, R: Serialize,
{ {
let mut res = CLIENT.get(format!("http://{}/api/v1/{}", hostname, endpoint)); let mut req = CLIENT.get(format!("http://{}/api/v1/{}", hostname, endpoint));
if let Some(query) = query { if let Some(query) = query {
res = res.query(&query); req = req.query(&query);
} }
let alpha_instance: T = res.send().await?.json().await?; handle_json_res(req).await
Ok(alpha_instance)
} }
pub async fn post<T: Serialize, R>(hostname: &str, endpoint: &str, form: &T) -> MyResult<R> pub async fn post<T: Serialize, R>(hostname: &str, endpoint: &str, form: &T) -> MyResult<R>
where where
R: for<'de> Deserialize<'de>, R: for<'de> Deserialize<'de>,
{ {
Ok(CLIENT let req = CLIENT
.post(format!("http://{}/api/v1/{}", hostname, endpoint)) .post(format!("http://{}/api/v1/{}", hostname, endpoint))
.form(form) .form(form);
.send() handle_json_res(req).await
.await?
.json()
.await?)
} }
pub async fn follow_instance(follow_instance: &str, followed_instance: &str) -> MyResult<()> { async fn handle_json_res<T>(req: RequestBuilder) -> MyResult<T>
where
T: for<'de> Deserialize<'de>,
{
let res = req.send().await?;
if res.status() == StatusCode::OK {
Ok(res.json().await?)
} else {
let text = res.text().await?;
Err(anyhow!("Post API response {text}").into())
}
}
pub async fn follow_instance(api_instance: &str, follow_instance: &str) -> MyResult<()> {
// fetch beta instance on alpha // fetch beta instance on alpha
let resolve_form = ResolveObject { let resolve_form = ResolveObject {
id: Url::parse(&format!("http://{}", followed_instance))?, id: Url::parse(&format!("http://{}", follow_instance))?,
}; };
let instance_resolved: DbInstance = let instance_resolved: DbInstance =
get_query(followed_instance, "resolve_instance", Some(resolve_form)).await?; get_query(api_instance, "resolve_instance", Some(resolve_form)).await?;
// send follow // send follow
let follow_form = FollowInstance { let follow_form = FollowInstance {
instance_id: instance_resolved.ap_id, id: instance_resolved.id,
}; };
// cant use post helper because follow doesnt return json // cant use post helper because follow doesnt return json
CLIENT CLIENT
.post(format!("http://{}/api/v1/instance/follow", follow_instance)) .post(format!("http://{}/api/v1/instance/follow", api_instance))
.form(&follow_form) .form(&follow_form)
.send() .send()
.await?; .await?;

29
tests/scripts/start_dev_db.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash
set -e
export PGHOST=$1
export PGDATA="$1/dev_pgdata"
# If cluster exists, stop the server
if [ -d $PGDATA ]
then
# Prevent `stop` from failing if server already stopped
pg_ctl restart > /dev/null
pg_ctl stop
fi
# Remove any leftover data from revious run
rm -rf $PGDATA
# Create cluster
initdb --username=postgres --auth=trust --no-instructions
touch "$PGHOST/.s.PGSQL.5432"
echo "$PGHOST/.s.PGSQL.5432"
# Start server that only listens to socket in current directory
pg_ctl start --options="-c listen_addresses= -c unix_socket_directories=$PGHOST"
# Setup database
psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres
psql -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres

9
tests/scripts/stop_dev_db.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/bash
set -e
export PGHOST=$1
export PGDATA="$1/dev_pgdata"
echo $PGHOST
pg_ctl stop
rm -rf $PGDATA

View File

@ -7,445 +7,461 @@ use crate::common::{
get_query, post, TestData, TEST_ARTICLE_DEFAULT_TEXT, get_query, post, TestData, TEST_ARTICLE_DEFAULT_TEXT,
}; };
use common::get; use common::get;
use fediwiki::api::{ use fediwiki::api::{EditArticleData, ForkArticleData, ResolveObject, SearchArticleData};
ApiConflict, EditArticleData, ForkArticleData, ResolveObject, SearchArticleData, use fediwiki::database::article::{ArticleView, DbArticle};
};
use fediwiki::error::MyResult; use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle;
use fediwiki::federation::objects::edit::ApubEdit; use fediwiki::database::conflict::ApiConflict;
use fediwiki::federation::objects::instance::DbInstance; use fediwiki::database::instance::{DbInstance, InstanceView};
use serial_test::serial; use pretty_assertions::{assert_eq, assert_ne};
use url::Url; use url::Url;
// TODO: can run tests in parallel if we use different ports
#[tokio::test] #[tokio::test]
#[serial]
async fn test_create_read_and_edit_article() -> MyResult<()> { async fn test_create_read_and_edit_article() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create article // create article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// now article can be read // now article can be read
let get_res = get_article(data.hostname_alpha, &create_res.ap_id).await?; let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?;
assert_eq!(title, get_res.title); assert_eq!(title, get_res.article.title);
assert_eq!(TEST_ARTICLE_DEFAULT_TEXT, get_res.text); assert_eq!(TEST_ARTICLE_DEFAULT_TEXT, get_res.article.text);
assert!(get_res.local); assert!(get_res.article.local);
// error on article which wasnt federated // error on article which wasnt federated
let not_found = get_article(data.hostname_beta, &create_res.ap_id).await; let not_found = get_article(&data.beta.hostname, create_res.article.id).await;
assert!(not_found.is_err()); assert!(not_found.is_err());
// edit article // edit article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum 2".to_string(), new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version, previous_version_id: get_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.article.text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
let search_form = SearchArticleData { let search_form = SearchArticleData {
title: title.clone(), query: title.clone(),
}; };
let search_res: Vec<DbArticle> = let search_res: Vec<DbArticle> =
get_query(data.hostname_alpha, "search", Some(search_form)).await?; get_query(&data.alpha.hostname, "search", Some(search_form)).await?;
assert_eq!(1, search_res.len()); assert_eq!(1, search_res.len());
assert_eq!(edit_res, search_res[0]); assert_eq!(edit_res.article, search_res[0]);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_create_duplicate_article() -> MyResult<()> { async fn test_create_duplicate_article() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create article // create article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
let create_res = create_article(data.hostname_alpha, title.clone()).await; let create_res = create_article(&data.alpha.hostname, title.clone()).await;
assert!(create_res.is_err()); assert!(create_res.is_err());
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_follow_instance() -> MyResult<()> { async fn test_follow_instance() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// check initial state // check initial state
let alpha_instance: DbInstance = get(data.hostname_alpha, "instance").await?; let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?;
assert_eq!(0, alpha_instance.follows.len()); assert_eq!(0, alpha_instance.followers.len());
let beta_instance: DbInstance = get(data.hostname_beta, "instance").await?; assert_eq!(0, alpha_instance.following.len());
let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?;
assert_eq!(0, beta_instance.followers.len()); assert_eq!(0, beta_instance.followers.len());
assert_eq!(0, beta_instance.following.len());
follow_instance(data.hostname_alpha, data.hostname_beta).await?; follow_instance(&data.alpha.hostname, &data.beta.hostname).await?;
// check that follow was federated // check that follow was federated
let beta_instance: DbInstance = get(data.hostname_beta, "instance").await?; let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?;
assert_eq!(1, beta_instance.followers.len()); assert_eq!(1, alpha_instance.following.len());
assert_eq!(0, alpha_instance.followers.len());
assert_eq!(
beta_instance.instance.ap_id,
alpha_instance.following[0].ap_id
);
let alpha_instance: DbInstance = get(data.hostname_alpha, "instance").await?; let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?;
assert_eq!(1, alpha_instance.follows.len()); assert_eq!(0, beta_instance.following.len());
assert_eq!(1, beta_instance.followers.len());
assert_eq!(
alpha_instance.instance.ap_id,
beta_instance.followers[0].ap_id
);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_synchronize_articles() -> MyResult<()> { async fn test_synchronize_articles() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create article on alpha // create article on alpha
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert_eq!(1, create_res.edits.len()); assert_eq!(1, create_res.edits.len());
assert!(create_res.local); assert!(create_res.article.local);
// edit the article // edit the article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum 2\n".to_string(), new_text: "Lorem Ipsum 2\n".to_string(),
previous_version: create_res.latest_version, previous_version_id: create_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
edit_article(data.hostname_alpha, &edit_form).await?; edit_article(&data.alpha.hostname, &edit_form).await?;
// article is not yet on beta // article is not yet on beta
let get_res = get_article(data.hostname_beta, &create_res.ap_id).await; let get_res = get_article(&data.beta.hostname, create_res.article.id).await;
assert!(get_res.is_err()); assert!(get_res.is_err());
// fetch alpha instance on beta, articles are also fetched automatically // fetch alpha instance on beta, articles are also fetched automatically
let resolve_object = ResolveObject { let resolve_object = ResolveObject {
id: Url::parse(&format!("http://{}", data.hostname_alpha))?, id: Url::parse(&format!("http://{}", &data.alpha.hostname))?,
}; };
get_query::<DbInstance, _>(data.hostname_beta, "resolve_instance", Some(resolve_object)) get_query::<DbInstance, _>(
.await?; &data.beta.hostname,
"resolve_instance",
Some(resolve_object),
)
.await?;
// get the article and compare // get the article and compare
let get_res = get_article(data.hostname_beta, &create_res.ap_id).await?; let get_res = get_article(&data.beta.hostname, create_res.article.id).await?;
assert_eq!(create_res.ap_id, get_res.ap_id); assert_eq!(create_res.article.ap_id, get_res.article.ap_id);
assert_eq!(title, get_res.title); assert_eq!(title, get_res.article.title);
assert_eq!(2, get_res.edits.len()); assert_eq!(2, get_res.edits.len());
assert_eq!(edit_form.new_text, get_res.text); assert_eq!(edit_form.new_text, get_res.article.text);
assert!(!get_res.local); assert!(!get_res.article.local);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_edit_local_article() -> MyResult<()> { async fn test_edit_local_article() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
follow_instance(data.hostname_alpha, data.hostname_beta).await?; follow_instance(&data.alpha.hostname, &data.beta.hostname).await?;
// create new article // create new article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_beta, title.clone()).await?; let create_res = create_article(&data.beta.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// article should be federated to alpha // article should be federated to alpha
let get_res = get_article(data.hostname_alpha, &create_res.ap_id).await?; let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?;
assert_eq!(create_res.title, get_res.title); assert_eq!(create_res.article.title, get_res.article.title);
assert_eq!(1, get_res.edits.len()); assert_eq!(1, get_res.edits.len());
assert!(!get_res.local); assert!(!get_res.article.local);
assert_eq!(create_res.text, get_res.text); assert_eq!(create_res.article.text, get_res.article.text);
// edit the article // edit the article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id, article_id: create_res.article.id,
new_text: "Lorem Ipsum 2".to_string(), new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version, previous_version_id: get_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_beta, &edit_form).await?; let edit_res = edit_article(&data.beta.hostname, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text); assert_eq!(edit_res.article.text, edit_form.new_text);
assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.edits.len(), 2);
assert!(edit_res.edits[0] assert!(edit_res.edits[0]
.id .ap_id
.to_string() .to_string()
.starts_with(&edit_res.ap_id.to_string())); .starts_with(&edit_res.article.ap_id.to_string()));
// edit should be federated to alpha // edit should be federated to alpha
let get_res = get_article(data.hostname_alpha, &edit_res.ap_id).await?; let get_res = get_article(&data.alpha.hostname, edit_res.article.id).await?;
assert_eq!(edit_res.title, get_res.title); assert_eq!(edit_res.article.title, get_res.article.title);
assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.edits.len(), 2);
assert_eq!(edit_res.text, get_res.text); assert_eq!(edit_res.article.text, get_res.article.text);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_edit_remote_article() -> MyResult<()> { async fn test_edit_remote_article() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
follow_instance(data.hostname_alpha, data.hostname_beta).await?; follow_instance(&data.alpha.hostname, &data.beta.hostname).await?;
follow_instance(data.hostname_gamma, data.hostname_beta).await?; follow_instance(&data.gamma.hostname, &data.beta.hostname).await?;
// create new article // create new article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_beta, title.clone()).await?; let create_res = create_article(&data.beta.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// article should be federated to alpha and gamma // article should be federated to alpha and gamma
let get_res = get_article(data.hostname_alpha, &create_res.ap_id).await?; let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?;
assert_eq!(create_res.title, get_res.title); assert_eq!(create_res.article.title, get_res.article.title);
assert_eq!(1, get_res.edits.len()); assert_eq!(1, get_res.edits.len());
assert!(!get_res.local); assert!(!get_res.article.local);
let get_res = get_article(data.hostname_gamma, &create_res.ap_id).await?; let get_res = get_article(&data.gamma.hostname, create_res.article.id).await?;
assert_eq!(create_res.title, get_res.title); assert_eq!(create_res.article.title, get_res.article.title);
assert_eq!(create_res.text, get_res.text); assert_eq!(create_res.article.text, get_res.article.text);
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum 2".to_string(), new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version, previous_version_id: get_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.article.text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
assert!(!edit_res.local); assert!(!edit_res.article.local);
assert!(edit_res.edits[0] assert!(edit_res.edits[0]
.id .ap_id
.to_string() .to_string()
.starts_with(&edit_res.ap_id.to_string())); .starts_with(&edit_res.article.ap_id.to_string()));
// edit should be federated to beta and gamma // edit should be federated to beta and gamma
let get_res = get_article(data.hostname_alpha, &create_res.ap_id).await?; let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?;
assert_eq!(edit_res.title, get_res.title); assert_eq!(edit_res.article.title, get_res.article.title);
assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.edits.len(), 2);
assert_eq!(edit_res.text, get_res.text); assert_eq!(edit_res.article.text, get_res.article.text);
let get_res = get_article(data.hostname_gamma, &create_res.ap_id).await?; let get_res = get_article(&data.gamma.hostname, create_res.article.id).await?;
assert_eq!(edit_res.title, get_res.title); assert_eq!(edit_res.article.title, get_res.article.title);
assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.edits.len(), 2);
assert_eq!(edit_res.text, get_res.text); assert_eq!(edit_res.article.text, get_res.article.text);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_local_edit_conflict() -> MyResult<()> { async fn test_local_edit_conflict() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create new article // create new article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// one user edits article // one user edits article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum\n".to_string(), new_text: "Lorem Ipsum\n".to_string(),
previous_version: create_res.latest_version.clone(), previous_version_id: create_res.latest_version.clone(),
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text); assert_eq!(edit_res.article.text, edit_form.new_text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
// another user edits article, without being aware of previous edit // another user edits article, without being aware of previous edit
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Ipsum Lorem\n".to_string(), new_text: "Ipsum Lorem\n".to_string(),
previous_version: create_res.latest_version, previous_version_id: create_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article_with_conflict(data.hostname_alpha, &edit_form) let edit_res = edit_article_with_conflict(&data.alpha.hostname, &edit_form)
.await? .await?
.unwrap(); .unwrap();
assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nsome\nexample\ntext\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge); assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nsome\nexample\ntext\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge);
let conflicts: Vec<ApiConflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; get_query(&data.alpha.hostname, "edit_conflicts", None::<()>).await?;
assert_eq!(1, conflicts.len()); assert_eq!(1, conflicts.len());
assert_eq!(conflicts[0], edit_res); assert_eq!(conflicts[0], edit_res);
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(), new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(),
previous_version: edit_res.previous_version, previous_version_id: edit_res.previous_version_id,
resolve_conflict_id: Some(edit_res.id), resolve_conflict_id: Some(edit_res.id),
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.article.text);
let conflicts: Vec<ApiConflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; get_query(&data.alpha.hostname, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len()); assert_eq!(0, conflicts.len());
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_federated_edit_conflict() -> MyResult<()> { async fn test_federated_edit_conflict() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
follow_instance(data.hostname_alpha, data.hostname_beta).await?; follow_instance(&data.alpha.hostname, &data.beta.hostname).await?;
// create new article // create new article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_beta, title.clone()).await?; let create_res = create_article(&data.beta.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// fetch article to gamma // fetch article to gamma
let resolve_object = ResolveObject { let resolve_object = ResolveObject {
id: create_res.ap_id.inner().clone(), id: create_res.article.ap_id.inner().clone(),
}; };
let resolve_res: DbArticle = let resolve_res: ArticleView = get_query(
get_query(data.hostname_gamma, "resolve_article", Some(resolve_object)).await?; &data.gamma.hostname,
assert_eq!(create_res.text, resolve_res.text); "resolve_article",
Some(resolve_object),
)
.await?;
assert_eq!(create_res.article.text, resolve_res.article.text);
// alpha edits article // alpha edits article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "Lorem Ipsum\n".to_string(), new_text: "Lorem Ipsum\n".to_string(),
previous_version: create_res.latest_version.clone(), previous_version_id: create_res.latest_version.clone(),
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text); assert_eq!(edit_res.article.text, edit_form.new_text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
assert!(!edit_res.local); assert!(!edit_res.article.local);
assert!(edit_res.edits[1] assert!(edit_res.edits[1]
.id .ap_id
.to_string() .to_string()
.starts_with(&edit_res.ap_id.to_string())); .starts_with(&edit_res.article.ap_id.to_string()));
// gamma also edits, as its not the latest version there is a conflict. local version should // gamma also edits, as its not the latest version there is a conflict. local version should
// not be updated with this conflicting version, instead user needs to handle the conflict // not be updated with this conflicting version, instead user needs to handle the conflict
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "aaaa\n".to_string(), new_text: "aaaa\n".to_string(),
previous_version: create_res.latest_version, previous_version_id: create_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_gamma, &edit_form).await?; let edit_res = edit_article(&data.gamma.hostname, &edit_form).await?;
assert_ne!(edit_form.new_text, edit_res.text); assert_ne!(edit_form.new_text, edit_res.article.text);
assert_eq!(2, edit_res.edits.len()); // TODO
assert!(!edit_res.local); //assert_eq!(2, edit_res.edits.len());
assert!(!edit_res.article.local);
let conflicts: Vec<ApiConflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?; get_query(&data.gamma.hostname, "edit_conflicts", None::<()>).await?;
assert_eq!(1, conflicts.len()); assert_eq!(1, conflicts.len());
// resolve the conflict // resolve the conflict
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id, article_id: create_res.article.id,
new_text: "aaaa\n".to_string(), new_text: "aaaa\n".to_string(),
previous_version: conflicts[0].previous_version.clone(), previous_version_id: conflicts[0].previous_version_id.clone(),
resolve_conflict_id: Some(conflicts[0].id), resolve_conflict_id: Some(conflicts[0].id.clone()),
}; };
let edit_res = edit_article(data.hostname_gamma, &edit_form).await?; let edit_res = edit_article(&data.gamma.hostname, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.article.text);
assert_eq!(3, edit_res.edits.len()); assert_eq!(3, edit_res.edits.len());
let conflicts: Vec<ApubEdit> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?; get_query(&data.gamma.hostname, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len()); assert_eq!(0, conflicts.len());
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_overlapping_edits_no_conflict() -> MyResult<()> { async fn test_overlapping_edits_no_conflict() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create new article // create new article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// one user edits article // one user edits article
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "my\nexample\ntext\n".to_string(), new_text: "my\nexample\ntext\n".to_string(),
previous_version: create_res.latest_version.clone(), previous_version_id: create_res.latest_version.clone(),
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text); assert_eq!(edit_res.article.text, edit_form.new_text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
// another user edits article, without being aware of previous edit // another user edits article, without being aware of previous edit
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), article_id: create_res.article.id,
new_text: "some\nexample\narticle\n".to_string(), new_text: "some\nexample\narticle\n".to_string(),
previous_version: create_res.latest_version, previous_version_id: create_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_alpha, &edit_form).await?; let edit_res = edit_article(&data.alpha.hostname, &edit_form).await?;
let conflicts: Vec<ApiConflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; get_query(&data.alpha.hostname, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len()); assert_eq!(0, conflicts.len());
assert_eq!(3, edit_res.edits.len()); assert_eq!(3, edit_res.edits.len());
assert_eq!("my\nexample\narticle\n", edit_res.text); assert_eq!("my\nexample\narticle\n", edit_res.article.text);
data.stop() data.stop()
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_fork_article() -> MyResult<()> { async fn test_fork_article() -> MyResult<()> {
let data = TestData::start(); let data = TestData::start();
// create article // create article
let title = "Manu_Chao".to_string(); let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?; let create_res = create_article(&data.alpha.hostname, title.clone()).await?;
assert_eq!(title, create_res.title); assert_eq!(title, create_res.article.title);
assert!(create_res.local); assert!(create_res.article.local);
// fetch on beta // fetch on beta
let resolve_object = ResolveObject { let resolve_object = ResolveObject {
id: create_res.ap_id.into_inner(), id: create_res.article.ap_id.into_inner(),
}; };
let resolved_article = let resolve_res: ArticleView =
get_query::<DbArticle, _>(data.hostname_beta, "resolve_article", Some(resolve_object)) get_query(&data.beta.hostname, "resolve_article", Some(resolve_object)).await?;
.await?; let resolved_article = resolve_res.article;
assert_eq!(create_res.edits.len(), resolved_article.edits.len()); assert_eq!(create_res.edits.len(), resolve_res.edits.len());
// fork the article to local instance // fork the article to local instance
let fork_form = ForkArticleData { let fork_form = ForkArticleData {
ap_id: resolved_article.ap_id.clone(), article_id: resolved_article.id,
}; };
let fork_res: DbArticle = post(data.hostname_beta, "article/fork", &fork_form).await?; let fork_res: ArticleView = post(&data.beta.hostname, "article/fork", &fork_form).await?;
assert_eq!(resolved_article.title, fork_res.title); let forked_article = fork_res.article;
assert_eq!(resolved_article.text, fork_res.text); assert_eq!(resolved_article.title, forked_article.title);
assert_eq!(resolved_article.edits, fork_res.edits); assert_eq!(resolved_article.text, forked_article.text);
assert_eq!(resolved_article.latest_version, fork_res.latest_version); assert_eq!(resolve_res.edits.len(), fork_res.edits.len());
assert_ne!(resolved_article.ap_id, fork_res.ap_id); assert_eq!(resolve_res.edits[0].diff, fork_res.edits[0].diff);
assert!(fork_res.local); assert_eq!(resolve_res.edits[0].hash, fork_res.edits[0].hash);
assert_ne!(resolve_res.edits[0].id, fork_res.edits[0].id);
assert_eq!(resolve_res.latest_version, fork_res.latest_version);
assert_ne!(resolved_article.ap_id, forked_article.ap_id);
assert!(forked_article.local);
let beta_instance: DbInstance = get(data.hostname_beta, "instance").await?; let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?;
assert_eq!(fork_res.instance, beta_instance.ap_id); assert_eq!(forked_article.instance_id, beta_instance.instance.id);
// now search returns two articles for this title (original and forked) // now search returns two articles for this title (original and forked)
let search_form = SearchArticleData { let search_form = SearchArticleData {
title: title.clone(), query: title.clone(),
}; };
let search_res: Vec<DbArticle> = let search_res: Vec<DbArticle> =
get_query(data.hostname_beta, "search", Some(search_form)).await?; get_query(&data.beta.hostname, "search", Some(search_form)).await?;
assert_eq!(2, search_res.len()); assert_eq!(2, search_res.len());
data.stop() data.stop()