all conflict tests are passing!

This commit is contained in:
Felix Ableitner 2023-11-28 13:04:33 +01:00
parent 201bba2d4b
commit cc8b1a9d54
11 changed files with 146 additions and 78 deletions

View File

@ -14,7 +14,8 @@ use axum::extract::Query;
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::{Form, Json, Router}; use axum::{Form, Json, Router};
use axum_macros::debug_handler; use axum_macros::debug_handler;
use diffy::{apply, create_patch, merge}; use diffy::{apply, create_patch, merge, Patch};
use futures::future::try_join_all;
use rand::random; use rand::random;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -76,21 +77,70 @@ pub struct EditArticleData {
pub resolve_conflict_id: Option<i32>, pub resolve_conflict_id: Option<i32>,
} }
// TODO: how to store conflict in db? with three-way-merge doesnt
// necessarily make sense (might be outdated)
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Conflict { pub struct ApiConflict {
pub id: i32, pub id: i32,
pub three_way_merge: String, pub three_way_merge: String,
pub article_id: ObjectId<DbArticle>, pub article_id: ObjectId<DbArticle>,
pub latest_version: EditVersion, pub previous_version: EditVersion,
}
#[derive(Clone, Debug)]
pub struct DbConflict {
pub id: i32,
pub diff: String,
pub article_id: ObjectId<DbArticle>,
pub previous_version: EditVersion,
}
impl DbConflict {
pub async fn to_api_conflict(
&self,
data: &Data<DatabaseHandle>,
) -> MyResult<Option<ApiConflict>> {
let original_article = {
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(self.article_id.inner()).unwrap();
article.clone()
};
// create common ancestor version
let ancestor = generate_article_version(&original_article.edits, &self.previous_version)?;
let patch = Patch::from_str(&self.diff)?;
if let Ok(new_text) = apply(&original_article.text, &patch) {
// patch applies cleanly so we are done
// federate the change
submit_article_update(data, new_text, &original_article).await?;
// remove conflict from db
let mut lock = data.conflicts.lock().unwrap();
lock.retain(|c| c.id != self.id);
Ok(None)
} else {
// there is a merge conflict, do three-way-merge
// apply self.diff to ancestor to get `ours`
let ours = apply(&ancestor, &patch)?;
// if it returns ok the merge was successful, which is impossible based on apply failing
// above. so we unconditionally read the three-way-merge string from err field
let merge = merge(&ancestor, &ours, &original_article.text)
.err()
.unwrap();
Ok(Some(ApiConflict {
id: self.id,
three_way_merge: merge,
article_id: original_article.ap_id.clone(),
previous_version: original_article.latest_version,
}))
}
}
} }
#[debug_handler] #[debug_handler]
async fn edit_article( async fn edit_article(
data: Data<DatabaseHandle>, data: Data<DatabaseHandle>,
Form(edit_form): Form<EditArticleData>, Form(edit_form): Form<EditArticleData>,
) -> MyResult<Json<Option<Conflict>>> { ) -> MyResult<Json<Option<ApiConflict>>> {
// resolve conflict if any // resolve conflict if any
if let Some(resolve_conflict_id) = &edit_form.resolve_conflict_id { if let Some(resolve_conflict_id) = &edit_form.resolve_conflict_id {
let mut lock = data.conflicts.lock().unwrap(); let mut lock = data.conflicts.lock().unwrap();
@ -100,60 +150,53 @@ async fn edit_article(
lock.retain(|c| &c.id != resolve_conflict_id); lock.retain(|c| &c.id != resolve_conflict_id);
} }
let original_article = { let original_article = {
let mut lock = data.articles.lock().unwrap(); let lock = data.articles.lock().unwrap();
let article = lock.get_mut(edit_form.ap_id.inner()).unwrap(); let article = lock.get(edit_form.ap_id.inner()).unwrap();
article.clone() article.clone()
}; };
if edit_form.previous_version == original_article.latest_version { if edit_form.previous_version == original_article.latest_version {
// no intermediate changes, simply submit new version // No intermediate changes, simply submit new version
submit_article_update(&data, &edit_form, &original_article).await?; submit_article_update(&data, edit_form.new_text.clone(), &original_article).await?;
Ok(Json(None)) Ok(Json(None))
} else { } else {
// create a patch from the differences of previous version and new version // There have been other changes since this edit was initiated. Get the common ancestor
// version and generate a diff to find out what exactly has changed.
let ancestor = let ancestor =
generate_article_version(&original_article.edits, Some(&edit_form.previous_version))?; generate_article_version(&original_article.edits, &edit_form.previous_version)?;
let patch = create_patch(&ancestor, &edit_form.new_text); let patch = create_patch(&ancestor, &edit_form.new_text);
if apply(&original_article.text, &patch).is_ok() {
// patch applies cleanly so we are done
submit_article_update(&data, &edit_form, &original_article).await?;
Ok(Json(None))
} else {
// there is a merge conflict, do three-way-merge
let merge = merge(&ancestor, &edit_form.new_text, &original_article.text)
.err()
.unwrap();
let conflict = Conflict { let db_conflict = DbConflict {
id: random(), id: random(),
three_way_merge: merge, diff: patch.to_string(),
article_id: original_article.ap_id, article_id: original_article.ap_id.clone(),
latest_version: original_article.latest_version, previous_version: edit_form.previous_version,
}; };
{
let mut lock = data.conflicts.lock().unwrap(); let mut lock = data.conflicts.lock().unwrap();
lock.push(conflict.clone()); lock.push(db_conflict.clone());
Ok(Json(Some(conflict)))
} }
Ok(Json(db_conflict.to_api_conflict(&data).await?))
} }
} }
async fn submit_article_update( async fn submit_article_update(
data: &Data<DatabaseHandle>, data: &Data<DatabaseHandle>,
edit_form: &EditArticleData, new_text: String,
original_article: &DbArticle, original_article: &DbArticle,
) -> Result<(), Error> { ) -> Result<(), Error> {
let edit = DbEdit::new(original_article, &edit_form.new_text)?; let edit = DbEdit::new(original_article, &new_text)?;
if original_article.local { if original_article.local {
let updated_article = { let updated_article = {
let mut lock = data.articles.lock().unwrap(); let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(edit_form.ap_id.inner()).unwrap(); let article = lock.get_mut(original_article.ap_id.inner()).unwrap();
article.text = edit_form.new_text.clone(); article.text = new_text;
article.latest_version = edit.version.clone(); article.latest_version = edit.version.clone();
article.edits.push(edit.clone()); article.edits.push(edit.clone());
article.clone() article.clone()
}; };
UpdateLocalArticle::send(updated_article, data).await?; UpdateLocalArticle::send(updated_article, vec![], data).await?;
} else { } else {
UpdateRemoteArticle::send( UpdateRemoteArticle::send(
edit, edit,
@ -229,8 +272,15 @@ async fn follow_instance(
} }
#[debug_handler] #[debug_handler]
async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<Conflict>>> { async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<ApiConflict>>> {
let lock = data.conflicts.lock().unwrap(); let conflicts = { data.conflicts.lock().unwrap().to_vec() };
let conflicts = lock.clone(); let conflicts: Vec<ApiConflict> = try_join_all(conflicts.into_iter().map(|c| {
let data = data.reset_request_count();
async move { c.to_api_conflict(&data).await }
}))
.await?
.into_iter()
.flatten()
.collect();
Ok(Json(conflicts)) Ok(Json(conflicts))
} }

View File

@ -1,4 +1,4 @@
use crate::api::Conflict; use crate::api::DbConflict;
use crate::federation::objects::article::DbArticle; use crate::federation::objects::article::DbArticle;
use crate::federation::objects::instance::DbInstance; use crate::federation::objects::instance::DbInstance;
@ -11,7 +11,7 @@ pub type DatabaseHandle = Arc<Database>;
pub struct Database { pub struct Database {
pub instances: Mutex<HashMap<Url, DbInstance>>, pub instances: Mutex<HashMap<Url, DbInstance>>,
pub articles: Mutex<HashMap<Url, DbArticle>>, pub articles: Mutex<HashMap<Url, DbArticle>>,
pub conflicts: Mutex<Vec<Conflict>>, pub conflicts: Mutex<Vec<DbConflict>>,
} }
impl Database { impl Database {

View File

@ -40,7 +40,9 @@ impl CreateArticle {
kind: Default::default(), kind: Default::default(),
id, id,
}; };
local_instance.send_to_followers(create, data).await?; local_instance
.send_to_followers(create, vec![], data)
.await?;
Ok(()) Ok(())
} }
} }
@ -64,7 +66,9 @@ impl ActivityHandler for CreateArticle {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let article = DbArticle::from_json(self.object.clone(), data).await?; let article = DbArticle::from_json(self.object.clone(), data).await?;
if article.local { if article.local {
data.local_instance().send_to_followers(self, data).await?; data.local_instance()
.send_to_followers(self, vec![], data)
.await?;
} }
Ok(()) Ok(())
} }

View File

@ -8,7 +8,9 @@ use activitypub_federation::{
config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many, config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many,
traits::ActivityHandler, traits::ActivityHandler,
}; };
use rand::random;
use crate::api::DbConflict;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -65,9 +67,14 @@ impl ActivityHandler for RejectEdit {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// cant convert this to DbEdit as it tries to apply patch and fails // cant convert this to DbEdit as it tries to apply patch and fails
let lock = data.conflicts.lock().unwrap(); let mut lock = data.conflicts.lock().unwrap();
todo!(); let conflict = DbConflict {
//lock.push(self.object); id: random(),
diff: self.object.content,
article_id: self.object.object,
previous_version: self.object.previous_version,
};
lock.push(conflict);
Ok(()) Ok(())
} }
} }

View File

@ -29,18 +29,26 @@ pub struct UpdateLocalArticle {
impl UpdateLocalArticle { impl UpdateLocalArticle {
/// Sent from article origin instance /// Sent from article origin instance
pub async fn send(article: DbArticle, data: &Data<DatabaseHandle>) -> MyResult<()> { pub async fn send(
article: DbArticle,
extra_recipients: Vec<DbInstance>,
data: &Data<DatabaseHandle>,
) -> MyResult<()> {
debug_assert!(article.local); debug_assert!(article.local);
let local_instance = data.local_instance(); let local_instance = data.local_instance();
let id = generate_activity_id(local_instance.ap_id.inner())?; let id = generate_activity_id(local_instance.ap_id.inner())?;
let mut to = local_instance.follower_ids();
to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone()));
let update = UpdateLocalArticle { let update = UpdateLocalArticle {
actor: local_instance.ap_id.clone(), actor: local_instance.ap_id.clone(),
to: local_instance.follower_ids(), to,
object: article.into_json(data).await?, object: article.into_json(data).await?,
kind: Default::default(), kind: Default::default(),
id, id,
}; };
local_instance.send_to_followers(update, data).await?; local_instance
.send_to_followers(update, extra_recipients, data)
.await?;
Ok(()) Ok(())
} }
} }

View File

@ -72,22 +72,23 @@ impl ActivityHandler for UpdateRemoteArticle {
/// Received on article origin instances /// Received on article origin instances
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let edit = DbEdit::from_json(self.object.clone(), data).await?;
let article_text = { let article_text = {
let lock = data.articles.lock().unwrap(); let lock = data.articles.lock().unwrap();
lock.get(self.object.object.inner()).unwrap().text.clone() lock.get(self.object.object.inner()).unwrap().text.clone()
}; };
let patch = Patch::from_str(&edit.diff)?; let patch = Patch::from_str(&self.object.content)?;
match apply(&article_text, &patch) { match apply(&article_text, &patch) {
Ok(applied) => { Ok(applied) => {
let article = { let article = {
let edit = DbEdit::from_json(self.object.clone(), data).await?;
let mut lock = data.articles.lock().unwrap(); let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(edit.article_id.inner()).unwrap(); let article = lock.get_mut(edit.article_id.inner()).unwrap();
article.text = applied; article.text = applied;
article.clone() article.clone()
}; };
UpdateLocalArticle::send(article, data).await?; UpdateLocalArticle::send(article, vec![self.actor.dereference(data).await?], data)
.await?;
} }
Err(_e) => { Err(_e) => {
let user_instance = self.actor.dereference(data).await?; let user_instance = self.actor.dereference(data).await?;

View File

@ -62,6 +62,7 @@ pub struct ApubEdit {
pub id: ObjectId<DbEdit>, pub id: ObjectId<DbEdit>,
pub content: String, pub content: String,
pub version: EditVersion, pub version: EditVersion,
pub previous_version: EditVersion,
pub object: ObjectId<DbArticle>, pub object: ObjectId<DbArticle>,
} }
@ -78,12 +79,18 @@ impl Object for DbEdit {
todo!() todo!()
} }
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let article_version = {
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(self.article_id.inner()).unwrap();
article.latest_version.clone()
};
Ok(ApubEdit { Ok(ApubEdit {
kind: EditType::Edit, kind: EditType::Edit,
id: self.id, id: self.id,
content: self.diff, content: self.diff,
version: self.version, version: self.version,
previous_version: article_version,
object: self.article_id, object: self.article_id,
}) })
} }

View File

@ -66,6 +66,7 @@ impl DbInstance {
pub async fn send_to_followers<Activity>( pub async fn send_to_followers<Activity>(
&self, &self,
activity: Activity, activity: Activity,
extra_recipients: Vec<DbInstance>,
data: &Data<DatabaseHandle>, data: &Data<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error> ) -> Result<(), <Activity as ActivityHandler>::Error>
where where
@ -73,11 +74,12 @@ impl DbInstance {
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>, <Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
{ {
let local_instance = data.local_instance(); let local_instance = data.local_instance();
let inboxes = local_instance let mut inboxes: Vec<_> = local_instance
.followers .followers
.iter() .iter()
.map(|f| f.inbox.clone()) .map(|f| f.inbox.clone())
.collect(); .collect();
inboxes.extend(extra_recipients.into_iter().map(|i| i.inbox));
local_instance.send(activity, inboxes, data).await?; local_instance.send(activity, inboxes, data).await?;
Ok(()) Ok(())
} }

View File

@ -21,23 +21,14 @@ pub fn generate_activity_id(domain: &Url) -> Result<Url, ParseError> {
/// ///
/// TODO: testing /// TODO: testing
/// TODO: should cache all these generated versions /// TODO: should cache all these generated versions
pub fn generate_article_version( pub fn generate_article_version(edits: &Vec<DbEdit>, version: &EditVersion) -> MyResult<String> {
edits: &Vec<DbEdit>,
version: Option<&EditVersion>,
) -> MyResult<String> {
let mut generated = String::new(); let mut generated = String::new();
if let Some(version) = version {
let exists = edits.iter().any(|e| &e.version == version);
if !exists {
Err(anyhow!("Attempting to generate invalid article version"))?;
}
}
for e in edits { for e in edits {
let patch = Patch::from_str(&e.diff)?; let patch = Patch::from_str(&e.diff)?;
generated = apply(&generated, &patch)?; generated = apply(&generated, &patch)?;
if Some(&e.version) == version { if &e.version == version {
return Ok(generated); return Ok(generated);
} }
} }
Ok(generated) Err(anyhow!("failed to generate article version").into())
} }

View File

@ -1,5 +1,5 @@
use fediwiki::api::{ use fediwiki::api::{
Conflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject, ApiConflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject,
}; };
use fediwiki::error::MyResult; use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle; use fediwiki::federation::objects::article::DbArticle;
@ -93,7 +93,7 @@ pub async fn get_article(hostname: &str, title: &str) -> MyResult<DbArticle> {
pub async fn edit_article_with_conflict( pub async fn edit_article_with_conflict(
hostname: &str, hostname: &str,
edit_form: &EditArticleData, edit_form: &EditArticleData,
) -> MyResult<Option<Conflict>> { ) -> MyResult<Option<ApiConflict>> {
Ok(CLIENT Ok(CLIENT
.patch(format!("http://{}/api/v1/article", hostname)) .patch(format!("http://{}/api/v1/article", hostname))
.form(edit_form) .form(edit_form)
@ -108,7 +108,7 @@ pub async fn edit_article(
title: &str, title: &str,
edit_form: &EditArticleData, edit_form: &EditArticleData,
) -> MyResult<DbArticle> { ) -> MyResult<DbArticle> {
let edit_res: Option<Conflict> = CLIENT let edit_res: Option<ApiConflict> = CLIENT
.patch(format!("http://{}/api/v1/article", hostname)) .patch(format!("http://{}/api/v1/article", hostname))
.form(edit_form) .form(edit_form)
.send() .send()

View File

@ -7,7 +7,7 @@ use crate::common::{
get_query, TestData, TEST_ARTICLE_DEFAULT_TEXT, get_query, TestData, TEST_ARTICLE_DEFAULT_TEXT,
}; };
use common::get; use common::get;
use fediwiki::api::{Conflict, EditArticleData, ResolveObject}; use fediwiki::api::{ApiConflict, EditArticleData, ResolveObject};
use fediwiki::error::MyResult; use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle; use fediwiki::federation::objects::article::DbArticle;
use fediwiki::federation::objects::edit::ApubEdit; use fediwiki::federation::objects::edit::ApubEdit;
@ -248,7 +248,7 @@ async fn test_local_edit_conflict() -> MyResult<()> {
.unwrap(); .unwrap();
assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nempty\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge); assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nempty\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge);
let conflicts: Vec<Conflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?;
assert_eq!(1, conflicts.len()); assert_eq!(1, conflicts.len());
assert_eq!(conflicts[0], edit_res); assert_eq!(conflicts[0], edit_res);
@ -256,13 +256,13 @@ async fn test_local_edit_conflict() -> MyResult<()> {
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(), ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(), new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(),
previous_version: edit_res.latest_version, previous_version: edit_res.previous_version,
resolve_conflict_id: Some(edit_res.id), resolve_conflict_id: Some(edit_res.id),
}; };
let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?; let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.text);
let conflicts: Vec<Conflict> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len()); assert_eq!(0, conflicts.len());
@ -314,25 +314,23 @@ async fn test_federated_edit_conflict() -> MyResult<()> {
previous_version: create_res.latest_version, previous_version: create_res.latest_version,
resolve_conflict_id: None, resolve_conflict_id: None,
}; };
let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?; let edit_res = edit_article(data.hostname_gamma, &title, &edit_form).await?;
assert_ne!(edit_form.new_text, edit_res.text); assert_ne!(edit_form.new_text, edit_res.text);
assert_eq!(2, edit_res.edits.len()); assert_eq!(2, edit_res.edits.len());
assert!(!edit_res.local); assert!(!edit_res.local);
let conflicts: Vec<ApubEdit> = let conflicts: Vec<ApiConflict> =
get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?; get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?;
// TODO: this should also return string for three-way-merge
dbg!(&conflicts);
assert_eq!(1, conflicts.len()); assert_eq!(1, conflicts.len());
// resolve the conflict // resolve the conflict
let edit_form = EditArticleData { let edit_form = EditArticleData {
ap_id: create_res.ap_id, ap_id: create_res.ap_id,
new_text: "aaaa\n".to_string(), new_text: "aaaa\n".to_string(),
previous_version: conflicts[0].version.clone(), previous_version: conflicts[0].previous_version.clone(),
resolve_conflict_id: todo!(), //Some(conflicts[0].id.clone()), resolve_conflict_id: Some(conflicts[0].id),
}; };
let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?; let edit_res = edit_article(data.hostname_gamma, &title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(edit_form.new_text, edit_res.text);
assert_eq!(3, edit_res.edits.len()); assert_eq!(3, edit_res.edits.len());