From cc8b1a9d54228b8c26edca42a1df0fdf008f185b Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 28 Nov 2023 13:04:33 +0100 Subject: [PATCH] all conflict tests are passing! --- src/api.rs | 124 ++++++++++++------ src/database.rs | 4 +- src/federation/activities/create_article.rs | 8 +- src/federation/activities/reject.rs | 13 +- .../activities/update_local_article.rs | 14 +- .../activities/update_remote_article.rs | 7 +- src/federation/objects/edit.rs | 9 +- src/federation/objects/instance.rs | 4 +- src/utils.rs | 15 +-- tests/common.rs | 6 +- tests/test.rs | 20 ++- 11 files changed, 146 insertions(+), 78 deletions(-) diff --git a/src/api.rs b/src/api.rs index b79174f..88e03cd 100644 --- a/src/api.rs +++ b/src/api.rs @@ -14,7 +14,8 @@ use axum::extract::Query; use axum::routing::{get, post}; use axum::{Form, Json, Router}; use axum_macros::debug_handler; -use diffy::{apply, create_patch, merge}; +use diffy::{apply, create_patch, merge, Patch}; +use futures::future::try_join_all; use rand::random; use serde::{Deserialize, Serialize}; use url::Url; @@ -76,21 +77,70 @@ pub struct EditArticleData { pub resolve_conflict_id: Option, } -// TODO: how to store conflict in db? with three-way-merge doesnt -// necessarily make sense (might be outdated) #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct Conflict { +pub struct ApiConflict { pub id: i32, pub three_way_merge: String, pub article_id: ObjectId, - pub latest_version: EditVersion, + pub previous_version: EditVersion, +} + +#[derive(Clone, Debug)] +pub struct DbConflict { + pub id: i32, + pub diff: String, + pub article_id: ObjectId, + pub previous_version: EditVersion, +} + +impl DbConflict { + pub async fn to_api_conflict( + &self, + data: &Data, + ) -> MyResult> { + let original_article = { + let mut lock = data.articles.lock().unwrap(); + let article = lock.get_mut(self.article_id.inner()).unwrap(); + article.clone() + }; + + // create common ancestor version + let ancestor = generate_article_version(&original_article.edits, &self.previous_version)?; + + let patch = Patch::from_str(&self.diff)?; + if let Ok(new_text) = apply(&original_article.text, &patch) { + // patch applies cleanly so we are done + // federate the change + submit_article_update(data, new_text, &original_article).await?; + // remove conflict from db + let mut lock = data.conflicts.lock().unwrap(); + lock.retain(|c| c.id != self.id); + Ok(None) + } else { + // there is a merge conflict, do three-way-merge + // apply self.diff to ancestor to get `ours` + let ours = apply(&ancestor, &patch)?; + // if it returns ok the merge was successful, which is impossible based on apply failing + // above. so we unconditionally read the three-way-merge string from err field + let merge = merge(&ancestor, &ours, &original_article.text) + .err() + .unwrap(); + + Ok(Some(ApiConflict { + id: self.id, + three_way_merge: merge, + article_id: original_article.ap_id.clone(), + previous_version: original_article.latest_version, + })) + } + } } #[debug_handler] async fn edit_article( data: Data, Form(edit_form): Form, -) -> MyResult>> { +) -> MyResult>> { // resolve conflict if any if let Some(resolve_conflict_id) = &edit_form.resolve_conflict_id { let mut lock = data.conflicts.lock().unwrap(); @@ -100,60 +150,53 @@ async fn edit_article( lock.retain(|c| &c.id != resolve_conflict_id); } let original_article = { - let mut lock = data.articles.lock().unwrap(); - let article = lock.get_mut(edit_form.ap_id.inner()).unwrap(); + let lock = data.articles.lock().unwrap(); + let article = lock.get(edit_form.ap_id.inner()).unwrap(); article.clone() }; if edit_form.previous_version == original_article.latest_version { - // no intermediate changes, simply submit new version - submit_article_update(&data, &edit_form, &original_article).await?; + // No intermediate changes, simply submit new version + submit_article_update(&data, edit_form.new_text.clone(), &original_article).await?; Ok(Json(None)) } else { - // create a patch from the differences of previous version and new version + // There have been other changes since this edit was initiated. Get the common ancestor + // version and generate a diff to find out what exactly has changed. let ancestor = - generate_article_version(&original_article.edits, Some(&edit_form.previous_version))?; + generate_article_version(&original_article.edits, &edit_form.previous_version)?; let patch = create_patch(&ancestor, &edit_form.new_text); - if apply(&original_article.text, &patch).is_ok() { - // patch applies cleanly so we are done - submit_article_update(&data, &edit_form, &original_article).await?; - Ok(Json(None)) - } else { - // there is a merge conflict, do three-way-merge - let merge = merge(&ancestor, &edit_form.new_text, &original_article.text) - .err() - .unwrap(); - let conflict = Conflict { - id: random(), - three_way_merge: merge, - article_id: original_article.ap_id, - latest_version: original_article.latest_version, - }; + let db_conflict = DbConflict { + id: random(), + diff: patch.to_string(), + article_id: original_article.ap_id.clone(), + previous_version: edit_form.previous_version, + }; + { let mut lock = data.conflicts.lock().unwrap(); - lock.push(conflict.clone()); - Ok(Json(Some(conflict))) + lock.push(db_conflict.clone()); } + Ok(Json(db_conflict.to_api_conflict(&data).await?)) } } async fn submit_article_update( data: &Data, - edit_form: &EditArticleData, + new_text: String, original_article: &DbArticle, ) -> Result<(), Error> { - let edit = DbEdit::new(original_article, &edit_form.new_text)?; + let edit = DbEdit::new(original_article, &new_text)?; if original_article.local { let updated_article = { let mut lock = data.articles.lock().unwrap(); - let article = lock.get_mut(edit_form.ap_id.inner()).unwrap(); - article.text = edit_form.new_text.clone(); + let article = lock.get_mut(original_article.ap_id.inner()).unwrap(); + article.text = new_text; article.latest_version = edit.version.clone(); article.edits.push(edit.clone()); article.clone() }; - UpdateLocalArticle::send(updated_article, data).await?; + UpdateLocalArticle::send(updated_article, vec![], data).await?; } else { UpdateRemoteArticle::send( edit, @@ -229,8 +272,15 @@ async fn follow_instance( } #[debug_handler] -async fn edit_conflicts(data: Data) -> MyResult>> { - let lock = data.conflicts.lock().unwrap(); - let conflicts = lock.clone(); +async fn edit_conflicts(data: Data) -> MyResult>> { + let conflicts = { data.conflicts.lock().unwrap().to_vec() }; + let conflicts: Vec = try_join_all(conflicts.into_iter().map(|c| { + let data = data.reset_request_count(); + async move { c.to_api_conflict(&data).await } + })) + .await? + .into_iter() + .flatten() + .collect(); Ok(Json(conflicts)) } diff --git a/src/database.rs b/src/database.rs index a48228f..c4db08e 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,4 +1,4 @@ -use crate::api::Conflict; +use crate::api::DbConflict; use crate::federation::objects::article::DbArticle; use crate::federation::objects::instance::DbInstance; @@ -11,7 +11,7 @@ pub type DatabaseHandle = Arc; pub struct Database { pub instances: Mutex>, pub articles: Mutex>, - pub conflicts: Mutex>, + pub conflicts: Mutex>, } impl Database { diff --git a/src/federation/activities/create_article.rs b/src/federation/activities/create_article.rs index 9a2ae33..fdcc582 100644 --- a/src/federation/activities/create_article.rs +++ b/src/federation/activities/create_article.rs @@ -40,7 +40,9 @@ impl CreateArticle { kind: Default::default(), id, }; - local_instance.send_to_followers(create, data).await?; + local_instance + .send_to_followers(create, vec![], data) + .await?; Ok(()) } } @@ -64,7 +66,9 @@ impl ActivityHandler for CreateArticle { async fn receive(self, data: &Data) -> Result<(), Self::Error> { let article = DbArticle::from_json(self.object.clone(), data).await?; if article.local { - data.local_instance().send_to_followers(self, data).await?; + data.local_instance() + .send_to_followers(self, vec![], data) + .await?; } Ok(()) } diff --git a/src/federation/activities/reject.rs b/src/federation/activities/reject.rs index 2dbdfeb..c71ece1 100644 --- a/src/federation/activities/reject.rs +++ b/src/federation/activities/reject.rs @@ -8,7 +8,9 @@ use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, protocol::helpers::deserialize_one_or_many, traits::ActivityHandler, }; +use rand::random; +use crate::api::DbConflict; use serde::{Deserialize, Serialize}; use url::Url; @@ -65,9 +67,14 @@ impl ActivityHandler for RejectEdit { async fn receive(self, data: &Data) -> Result<(), Self::Error> { // cant convert this to DbEdit as it tries to apply patch and fails - let lock = data.conflicts.lock().unwrap(); - todo!(); - //lock.push(self.object); + let mut lock = data.conflicts.lock().unwrap(); + let conflict = DbConflict { + id: random(), + diff: self.object.content, + article_id: self.object.object, + previous_version: self.object.previous_version, + }; + lock.push(conflict); Ok(()) } } diff --git a/src/federation/activities/update_local_article.rs b/src/federation/activities/update_local_article.rs index 680194f..b5e5d70 100644 --- a/src/federation/activities/update_local_article.rs +++ b/src/federation/activities/update_local_article.rs @@ -29,18 +29,26 @@ pub struct UpdateLocalArticle { impl UpdateLocalArticle { /// Sent from article origin instance - pub async fn send(article: DbArticle, data: &Data) -> MyResult<()> { + pub async fn send( + article: DbArticle, + extra_recipients: Vec, + data: &Data, + ) -> MyResult<()> { debug_assert!(article.local); let local_instance = data.local_instance(); let id = generate_activity_id(local_instance.ap_id.inner())?; + let mut to = local_instance.follower_ids(); + to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone())); let update = UpdateLocalArticle { actor: local_instance.ap_id.clone(), - to: local_instance.follower_ids(), + to, object: article.into_json(data).await?, kind: Default::default(), id, }; - local_instance.send_to_followers(update, data).await?; + local_instance + .send_to_followers(update, extra_recipients, data) + .await?; Ok(()) } } diff --git a/src/federation/activities/update_remote_article.rs b/src/federation/activities/update_remote_article.rs index d9a7b79..3a37d3a 100644 --- a/src/federation/activities/update_remote_article.rs +++ b/src/federation/activities/update_remote_article.rs @@ -72,22 +72,23 @@ impl ActivityHandler for UpdateRemoteArticle { /// Received on article origin instances async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let edit = DbEdit::from_json(self.object.clone(), data).await?; let article_text = { let lock = data.articles.lock().unwrap(); lock.get(self.object.object.inner()).unwrap().text.clone() }; - let patch = Patch::from_str(&edit.diff)?; + let patch = Patch::from_str(&self.object.content)?; match apply(&article_text, &patch) { Ok(applied) => { let article = { + let edit = DbEdit::from_json(self.object.clone(), data).await?; let mut lock = data.articles.lock().unwrap(); let article = lock.get_mut(edit.article_id.inner()).unwrap(); article.text = applied; article.clone() }; - UpdateLocalArticle::send(article, data).await?; + UpdateLocalArticle::send(article, vec![self.actor.dereference(data).await?], data) + .await?; } Err(_e) => { let user_instance = self.actor.dereference(data).await?; diff --git a/src/federation/objects/edit.rs b/src/federation/objects/edit.rs index b68799f..2984688 100644 --- a/src/federation/objects/edit.rs +++ b/src/federation/objects/edit.rs @@ -62,6 +62,7 @@ pub struct ApubEdit { pub id: ObjectId, pub content: String, pub version: EditVersion, + pub previous_version: EditVersion, pub object: ObjectId, } @@ -78,12 +79,18 @@ impl Object for DbEdit { todo!() } - async fn into_json(self, _data: &Data) -> Result { + async fn into_json(self, data: &Data) -> Result { + let article_version = { + let mut lock = data.articles.lock().unwrap(); + let article = lock.get_mut(self.article_id.inner()).unwrap(); + article.latest_version.clone() + }; Ok(ApubEdit { kind: EditType::Edit, id: self.id, content: self.diff, version: self.version, + previous_version: article_version, object: self.article_id, }) } diff --git a/src/federation/objects/instance.rs b/src/federation/objects/instance.rs index f03bd63..61c67a8 100644 --- a/src/federation/objects/instance.rs +++ b/src/federation/objects/instance.rs @@ -66,6 +66,7 @@ impl DbInstance { pub async fn send_to_followers( &self, activity: Activity, + extra_recipients: Vec, data: &Data, ) -> Result<(), ::Error> where @@ -73,11 +74,12 @@ impl DbInstance { ::Error: From, { let local_instance = data.local_instance(); - let inboxes = local_instance + let mut inboxes: Vec<_> = local_instance .followers .iter() .map(|f| f.inbox.clone()) .collect(); + inboxes.extend(extra_recipients.into_iter().map(|i| i.inbox)); local_instance.send(activity, inboxes, data).await?; Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index fb8cece..dd9e87d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -21,23 +21,14 @@ pub fn generate_activity_id(domain: &Url) -> Result { /// /// TODO: testing /// TODO: should cache all these generated versions -pub fn generate_article_version( - edits: &Vec, - version: Option<&EditVersion>, -) -> MyResult { +pub fn generate_article_version(edits: &Vec, version: &EditVersion) -> MyResult { let mut generated = String::new(); - if let Some(version) = version { - let exists = edits.iter().any(|e| &e.version == version); - if !exists { - Err(anyhow!("Attempting to generate invalid article version"))?; - } - } for e in edits { let patch = Patch::from_str(&e.diff)?; generated = apply(&generated, &patch)?; - if Some(&e.version) == version { + if &e.version == version { return Ok(generated); } } - Ok(generated) + Err(anyhow!("failed to generate article version").into()) } diff --git a/tests/common.rs b/tests/common.rs index 0919af7..11cac8f 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -1,5 +1,5 @@ use fediwiki::api::{ - Conflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject, + ApiConflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject, }; use fediwiki::error::MyResult; use fediwiki::federation::objects::article::DbArticle; @@ -93,7 +93,7 @@ pub async fn get_article(hostname: &str, title: &str) -> MyResult { pub async fn edit_article_with_conflict( hostname: &str, edit_form: &EditArticleData, -) -> MyResult> { +) -> MyResult> { Ok(CLIENT .patch(format!("http://{}/api/v1/article", hostname)) .form(edit_form) @@ -108,7 +108,7 @@ pub async fn edit_article( title: &str, edit_form: &EditArticleData, ) -> MyResult { - let edit_res: Option = CLIENT + let edit_res: Option = CLIENT .patch(format!("http://{}/api/v1/article", hostname)) .form(edit_form) .send() diff --git a/tests/test.rs b/tests/test.rs index cfdc9f2..46edd52 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -7,7 +7,7 @@ use crate::common::{ get_query, TestData, TEST_ARTICLE_DEFAULT_TEXT, }; use common::get; -use fediwiki::api::{Conflict, EditArticleData, ResolveObject}; +use fediwiki::api::{ApiConflict, EditArticleData, ResolveObject}; use fediwiki::error::MyResult; use fediwiki::federation::objects::article::DbArticle; use fediwiki::federation::objects::edit::ApubEdit; @@ -248,7 +248,7 @@ async fn test_local_edit_conflict() -> MyResult<()> { .unwrap(); assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nempty\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge); - let conflicts: Vec = + let conflicts: Vec = get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; assert_eq!(1, conflicts.len()); assert_eq!(conflicts[0], edit_res); @@ -256,13 +256,13 @@ async fn test_local_edit_conflict() -> MyResult<()> { let edit_form = EditArticleData { ap_id: create_res.ap_id.clone(), new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(), - previous_version: edit_res.latest_version, + previous_version: edit_res.previous_version, resolve_conflict_id: Some(edit_res.id), }; let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?; assert_eq!(edit_form.new_text, edit_res.text); - let conflicts: Vec = + let conflicts: Vec = get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?; assert_eq!(0, conflicts.len()); @@ -314,25 +314,23 @@ async fn test_federated_edit_conflict() -> MyResult<()> { previous_version: create_res.latest_version, resolve_conflict_id: None, }; - let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?; + let edit_res = edit_article(data.hostname_gamma, &title, &edit_form).await?; assert_ne!(edit_form.new_text, edit_res.text); assert_eq!(2, edit_res.edits.len()); assert!(!edit_res.local); - let conflicts: Vec = + let conflicts: Vec = get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?; - // TODO: this should also return string for three-way-merge - dbg!(&conflicts); assert_eq!(1, conflicts.len()); // resolve the conflict let edit_form = EditArticleData { ap_id: create_res.ap_id, new_text: "aaaa\n".to_string(), - previous_version: conflicts[0].version.clone(), - resolve_conflict_id: todo!(), //Some(conflicts[0].id.clone()), + previous_version: conflicts[0].previous_version.clone(), + resolve_conflict_id: Some(conflicts[0].id), }; - let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?; + let edit_res = edit_article(data.hostname_gamma, &title, &edit_form).await?; assert_eq!(edit_form.new_text, edit_res.text); assert_eq!(3, edit_res.edits.len());