handle local conflict

This commit is contained in:
Felix Ableitner 2023-11-27 16:34:45 +01:00
parent 2b0fbc6236
commit 75f2530689
8 changed files with 215 additions and 26 deletions

View File

@ -1,20 +1,21 @@
use crate::database::DatabaseHandle;
use crate::error::MyResult;
use crate::error::{Error, MyResult};
use crate::federation::activities::create_article::CreateArticle;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use crate::federation::activities::update_remote_article::UpdateRemoteArticle;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::edit::{ApubEdit, DbEdit, EditVersion};
use crate::federation::objects::edit::{DbEdit, EditVersion};
use crate::federation::objects::instance::DbInstance;
use crate::utils::generate_article_version;
use activitypub_federation::config::Data;
use activitypub_federation::fetch::object_id::ObjectId;
use anyhow::anyhow;
use crate::federation::activities::update_local_article::UpdateLocalArticle;
use axum::extract::Query;
use axum::routing::{get, post};
use axum::{Form, Json, Router};
use axum_macros::debug_handler;
use diffy::{apply, create_patch, merge};
use rand::random;
use serde::{Deserialize, Serialize};
use url::Url;
@ -71,24 +72,82 @@ async fn create_article(
pub struct EditArticleData {
pub ap_id: ObjectId<DbArticle>,
pub new_text: String,
pub previous_version: EditVersion,
pub resolve_conflict_id: Option<i32>,
}
// TODO: how to store conflict in db? with three-way-merge doesnt
// necessarily make sense (might be outdated)
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Conflict {
pub id: i32,
pub three_way_merge: String,
pub article_id: ObjectId<DbArticle>,
pub latest_version: EditVersion,
}
#[debug_handler]
async fn edit_article(
data: Data<DatabaseHandle>,
Form(edit_article): Form<EditArticleData>,
) -> MyResult<()> {
Form(edit_form): Form<EditArticleData>,
) -> MyResult<Json<Option<Conflict>>> {
// resolve conflict if any
if let Some(resolve_conflict_id) = &edit_form.resolve_conflict_id {
let mut lock = data.conflicts.lock().unwrap();
if lock.iter().find(|c| &c.id == resolve_conflict_id).is_none() {
return Err(anyhow!("invalid resolve conflict"))?;
}
lock.retain(|c| &c.id != resolve_conflict_id);
}
let original_article = {
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(edit_article.ap_id.inner()).unwrap();
let article = lock.get_mut(edit_form.ap_id.inner()).unwrap();
article.clone()
};
let edit = DbEdit::new(&original_article, &edit_article.new_text)?;
if edit_form.previous_version == original_article.latest_version {
// no intermediate changes, simply submit new version
submit_article_update(&data, &edit_form, &original_article).await?;
Ok(Json(None))
} else {
// create a patch from the differences of previous version and new version
let ancestor =
generate_article_version(&original_article.edits, Some(&edit_form.previous_version))?;
let patch = create_patch(&ancestor, &edit_form.new_text);
if apply(&original_article.text, &patch).is_ok() {
// patch applies cleanly so we are done
submit_article_update(&data, &edit_form, &original_article).await?;
Ok(Json(None))
} else {
// there is a merge conflict, do three-way-merge
let merge = merge(&ancestor, &edit_form.new_text, &original_article.text)
.err()
.unwrap();
let conflict = Conflict {
id: random(),
three_way_merge: merge,
article_id: original_article.ap_id,
latest_version: original_article.latest_version,
};
let mut lock = data.conflicts.lock().unwrap();
lock.push(conflict.clone());
return Ok(Json(Some(conflict)));
}
}
}
async fn submit_article_update(
data: &Data<DatabaseHandle>,
edit_form: &EditArticleData,
original_article: &DbArticle,
) -> Result<(), Error> {
let edit = DbEdit::new(&original_article, &edit_form.new_text)?;
if original_article.local {
let updated_article = {
let mut lock = data.articles.lock().unwrap();
let article = lock.get_mut(edit_article.ap_id.inner()).unwrap();
article.text = edit_article.new_text;
let article = lock.get_mut(edit_form.ap_id.inner()).unwrap();
article.text = edit_form.new_text.clone();
article.latest_version = edit.version.clone();
article.edits.push(edit.clone());
article.clone()
@ -103,7 +162,6 @@ async fn edit_article(
)
.await?;
}
Ok(())
}
@ -171,7 +229,7 @@ async fn follow_instance(
}
#[debug_handler]
async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<ApubEdit>>> {
async fn edit_conflicts(data: Data<DatabaseHandle>) -> MyResult<Json<Vec<Conflict>>> {
let lock = data.conflicts.lock().unwrap();
let conflicts = lock.clone();
Ok(Json(conflicts))

View File

@ -1,3 +1,4 @@
use crate::api::Conflict;
use crate::federation::objects::article::DbArticle;
use crate::federation::objects::edit::ApubEdit;
use crate::federation::objects::instance::DbInstance;
@ -10,7 +11,7 @@ pub type DatabaseHandle = Arc<Database>;
pub struct Database {
pub instances: Mutex<HashMap<Url, DbInstance>>,
pub articles: Mutex<HashMap<Url, DbArticle>>,
pub conflicts: Mutex<Vec<ApubEdit>>,
pub conflicts: Mutex<Vec<Conflict>>,
}
impl Database {

View File

@ -64,9 +64,10 @@ impl ActivityHandler for RejectEdit {
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// TODO: cant convert this to DbEdit as it tries to apply patch and fails
// cant convert this to DbEdit as it tries to apply patch and fails
let mut lock = data.conflicts.lock().unwrap();
lock.push(self.object);
todo!();
//lock.push(self.object);
Ok(())
}
}

View File

@ -46,7 +46,6 @@ impl UpdateRemoteArticle {
kind: Default::default(),
id,
};
dbg!(&update);
local_instance
.send(update, vec![article_instance.inbox], data)
.await?;

View File

@ -59,7 +59,7 @@ pub enum EditType {
pub struct ApubEdit {
#[serde(rename = "type")]
kind: EditType,
id: ObjectId<DbEdit>,
pub id: ObjectId<DbEdit>,
pub content: String,
pub version: EditVersion,
pub object: ObjectId<DbArticle>,

View File

@ -1,3 +1,7 @@
use crate::error::MyResult;
use crate::federation::objects::edit::{DbEdit, EditVersion};
use anyhow::anyhow;
use diffy::{apply, Patch};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use url::{ParseError, Url};
@ -11,3 +15,29 @@ pub fn generate_activity_id(domain: &Url) -> Result<Url, ParseError> {
.collect();
Url::parse(&format!("http://{}:{}/objects/{}", domain, port, id))
}
/// Starting from empty string, apply edits until the specified version is reached. If no version is
/// given, apply all edits up to latest version.
///
/// TODO: testing
/// TODO: should cache all these generated versions
pub fn generate_article_version(
edits: &Vec<DbEdit>,
version: Option<&EditVersion>,
) -> MyResult<String> {
let mut generated = String::new();
if let Some(version) = version {
let exists = edits.iter().any(|e| &e.version == version);
if !exists {
return Err(anyhow!("Attempting to generate invalid article version"))?;
}
}
for e in edits {
let patch = Patch::from_str(&e.diff)?;
generated = apply(&generated, &patch)?;
if Some(&e.version) == version {
return Ok(generated);
}
}
Ok(generated)
}

View File

@ -1,5 +1,5 @@
use fediwiki::api::{
CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject,
Conflict, CreateArticleData, EditArticleData, FollowInstance, GetArticleData, ResolveObject,
};
use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle;
@ -77,6 +77,8 @@ pub async fn create_article(hostname: &str, title: String) -> MyResult<DbArticle
let edit_form = EditArticleData {
ap_id: article.ap_id,
new_text: TEST_ARTICLE_DEFAULT_TEXT.to_string(),
previous_version: article.latest_version,
resolve_conflict_id: None,
};
edit_article(hostname, &title, &edit_form).await
}
@ -88,16 +90,32 @@ pub async fn get_article(hostname: &str, title: &str) -> MyResult<DbArticle> {
get_query::<DbArticle, _>(hostname, "article", Some(get_article.clone())).await
}
pub async fn edit_article_with_conflict(
hostname: &str,
edit_form: &EditArticleData,
) -> MyResult<Option<Conflict>> {
Ok(CLIENT
.patch(format!("http://{}/api/v1/article", hostname))
.form(edit_form)
.send()
.await?
.json()
.await?)
}
pub async fn edit_article(
hostname: &str,
title: &str,
edit_form: &EditArticleData,
) -> MyResult<DbArticle> {
CLIENT
let edit_res: Option<Conflict> = CLIENT
.patch(format!("http://{}/api/v1/article", hostname))
.form(edit_form)
.send()
.await?
.json()
.await?;
assert!(edit_res.is_none());
let get_article = GetArticleData {
title: title.to_string(),
};

View File

@ -3,11 +3,11 @@ extern crate fediwiki;
mod common;
use crate::common::{
create_article, edit_article, follow_instance, get_article, get_query, TestData,
TEST_ARTICLE_DEFAULT_TEXT,
create_article, edit_article, edit_article_with_conflict, follow_instance, get_article,
get_query, TestData, TEST_ARTICLE_DEFAULT_TEXT,
};
use common::get;
use fediwiki::api::{EditArticleData, ResolveObject};
use fediwiki::api::{Conflict, EditArticleData, ResolveObject};
use fediwiki::error::MyResult;
use fediwiki::federation::objects::article::DbArticle;
use fediwiki::federation::objects::edit::ApubEdit;
@ -41,6 +41,8 @@ async fn test_create_read_and_edit_article() -> MyResult<()> {
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version,
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text);
@ -88,6 +90,8 @@ async fn test_synchronize_articles() -> MyResult<()> {
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum 2\n".to_string(),
previous_version: create_res.latest_version,
resolve_conflict_id: None,
};
edit_article(data.hostname_alpha, &title, &edit_form).await?;
@ -137,6 +141,8 @@ async fn test_edit_local_article() -> MyResult<()> {
let edit_form = EditArticleData {
ap_id: create_res.ap_id,
new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version,
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_beta, &create_res.title, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text);
@ -182,6 +188,8 @@ async fn test_edit_remote_article() -> MyResult<()> {
let edit_form = EditArticleData {
ap_id: create_res.ap_id,
new_text: "Lorem Ipsum 2".to_string(),
previous_version: get_res.latest_version,
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_alpha, &title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text);
@ -208,7 +216,62 @@ async fn test_edit_remote_article() -> MyResult<()> {
#[tokio::test]
#[serial]
async fn test_edit_conflict() -> MyResult<()> {
async fn test_local_edit_conflict() -> MyResult<()> {
let data = TestData::start();
// create new article
let title = "Manu_Chao".to_string();
let create_res = create_article(data.hostname_alpha, title.clone()).await?;
assert_eq!(title, create_res.title);
assert!(create_res.local);
// one user edits article
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum\n".to_string(),
previous_version: create_res.latest_version.clone(),
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text);
assert_eq!(2, edit_res.edits.len());
// another user edits article, without being aware of previous edit
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Ipsum Lorem\n".to_string(),
previous_version: create_res.latest_version,
resolve_conflict_id: None,
};
let edit_res = edit_article_with_conflict(data.hostname_alpha, &edit_form)
.await?
.unwrap();
assert_eq!("<<<<<<< ours\nIpsum Lorem\n||||||| original\nempty\n=======\nLorem Ipsum\n>>>>>>> theirs\n", edit_res.three_way_merge);
let conflicts: Vec<Conflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?;
assert_eq!(1, conflicts.len());
assert_eq!(conflicts[0], edit_res);
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum and Ipsum Lorem\n".to_string(),
previous_version: edit_res.latest_version,
resolve_conflict_id: Some(edit_res.id),
};
let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text);
let conflicts: Vec<Conflict> =
get_query(data.hostname_alpha, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len());
data.stop()
}
#[tokio::test]
#[serial]
async fn test_federated_edit_conflict() -> MyResult<()> {
let data = TestData::start();
follow_instance(data.hostname_alpha, data.hostname_beta).await?;
@ -231,6 +294,8 @@ async fn test_edit_conflict() -> MyResult<()> {
let edit_form = EditArticleData {
ap_id: create_res.ap_id.clone(),
new_text: "Lorem Ipsum\n".to_string(),
previous_version: create_res.latest_version.clone(),
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_alpha, &create_res.title, &edit_form).await?;
assert_eq!(edit_res.text, edit_form.new_text);
@ -244,8 +309,10 @@ async fn test_edit_conflict() -> MyResult<()> {
// gamma also edits, as its not the latest version there is a conflict. local version should
// not be updated with this conflicting version, instead user needs to handle the conflict
let edit_form = EditArticleData {
ap_id: create_res.ap_id,
ap_id: create_res.ap_id.clone(),
new_text: "aaaa\n".to_string(),
previous_version: create_res.latest_version,
resolve_conflict_id: None,
};
let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?;
assert_ne!(edit_form.new_text, edit_res.text);
@ -254,9 +321,24 @@ async fn test_edit_conflict() -> MyResult<()> {
let conflicts: Vec<ApubEdit> =
get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?;
// TODO: this should also return string for three-way-merge
dbg!(&conflicts);
assert_eq!(1, conflicts.len());
// TODO: need a way to mark conflict as resolved, maybe opt param on edit endpoint
// resolve the conflict
let edit_form = EditArticleData {
ap_id: create_res.ap_id,
new_text: "aaaa\n".to_string(),
previous_version: conflicts[0].version.clone(),
resolve_conflict_id: todo!(), //Some(conflicts[0].id.clone()),
};
let edit_res = edit_article(data.hostname_gamma, &create_res.title, &edit_form).await?;
assert_eq!(edit_form.new_text, edit_res.text);
assert_eq!(3, edit_res.edits.len());
let conflicts: Vec<ApubEdit> =
get_query(data.hostname_gamma, "edit_conflicts", None::<()>).await?;
assert_eq!(0, conflicts.len());
data.stop()
}