diff --git a/Cargo.lock b/Cargo.lock index e569898..133dd64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,6 +5,7 @@ version = 3 [[package]] name = "activitypub_federation" version = "0.5.0-beta.5" +source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=diesel-feature#ffb020bcdd5004fdcba501950e6a87bc82c806ed" dependencies = [ "activitystreams-kinds", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 6d41432..e088be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -activitypub_federation = { path = "../lemmy/activitypub-federation-rust", features = ["axum", "diesel"], default-features = false } +activitypub_federation = { git = "https://github.com/LemmyNet/activitypub-federation-rust.git", branch = "diesel-feature", features = ["axum", "diesel"], default-features = false } anyhow = "1.0.75" async-trait = "0.1.74" axum = "0.6.20" diff --git a/migrations/2023-11-28-150402_article/up.sql b/migrations/2023-11-28-150402_article/up.sql index d4a8d7d..641b7f4 100644 --- a/migrations/2023-11-28-150402_article/up.sql +++ b/migrations/2023-11-28-150402_article/up.sql @@ -11,11 +11,12 @@ create table instance ( create table instance_follow ( id serial primary key, + instance_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, follower_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, - followed_id int REFERENCES instance ON UPDATE CASCADE ON DELETE CASCADE NOT NULL, - pending boolean not null - + pending boolean not null, + unique(instance_id, follower_id) ); + create table article ( id serial primary key, title text not null, diff --git a/src/api.rs b/src/api.rs index 2895ba2..b35078b 100644 --- a/src/api.rs +++ b/src/api.rs @@ -129,7 +129,6 @@ async fn edit_article( let ancestor = generate_article_version(&original_article.edits, &edit_form.previous_version)?; let patch = create_patch(&ancestor, &edit_form.new_text); - dbg!(&edit_form.previous_version); let db_conflict = DbConflict { id: random(), @@ -141,7 +140,7 @@ async fn edit_article( let mut lock = data.conflicts.lock().unwrap(); lock.push(db_conflict.clone()); } - Ok(Json(dbg!(db_conflict.to_api_conflict(&data).await)?)) + Ok(Json(db_conflict.to_api_conflict(&data).await?)) } } @@ -226,21 +225,16 @@ async fn follow_instance( /// Get a list of all unresolved edit conflicts. #[debug_handler] async fn edit_conflicts(data: Data) -> MyResult>> { - dbg!("a"); let conflicts = { data.conflicts.lock().unwrap().to_vec() }; - dbg!(&conflicts); - dbg!("b"); let conflicts: Vec = try_join_all(conflicts.into_iter().map(|c| { let data = data.reset_request_count(); - dbg!(&c.previous_version); - async move { dbg!(c.to_api_conflict(&data).await) } + async move { c.to_api_conflict(&data).await } })) .await? .into_iter() .flatten() .collect(); - dbg!("c"); - Ok(Json(dbg!(conflicts))) + Ok(Json(conflicts)) } #[derive(Deserialize, Serialize, Clone)] diff --git a/src/database/article.rs b/src/database/article.rs index ef82824..8126460 100644 --- a/src/database/article.rs +++ b/src/database/article.rs @@ -1,5 +1,5 @@ use crate::database::edit::{DbEdit, EditVersion}; -use crate::database::instance::DbInstance; + use crate::database::schema::article; use crate::error::MyResult; use crate::federation::objects::edits_collection::DbEditCollection; diff --git a/src/database/instance.rs b/src/database/instance.rs index e2f26e5..e776ed1 100644 --- a/src/database/instance.rs +++ b/src/database/instance.rs @@ -1,19 +1,18 @@ -use crate::database::article::DbArticle; use crate::database::schema::{instance, instance_follow}; use crate::database::MyDataHandle; use crate::error::{Error, MyResult}; -use crate::federation::activities::follow::Follow; + use crate::federation::objects::articles_collection::DbArticleCollection; use activitypub_federation::activity_sending::SendActivityTask; use activitypub_federation::config::Data; use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::fetch::object_id::ObjectId; use activitypub_federation::protocol::context::WithContext; -use activitypub_federation::traits::{ActivityHandler, Actor}; +use activitypub_federation::traits::ActivityHandler; use chrono::{DateTime, Utc}; use diesel::ExpressionMethods; use diesel::{ - insert_into, update, AsChangeset, Identifiable, Insertable, JoinOnDsl, PgConnection, QueryDsl, + insert_into, AsChangeset, Identifiable, Insertable, JoinOnDsl, PgConnection, QueryDsl, Queryable, RunQueryDsl, Selectable, }; use serde::{Deserialize, Serialize}; @@ -56,7 +55,7 @@ pub struct DbInstanceForm { pub struct InstanceView { pub instance: DbInstance, pub followers: Vec, - pub followed: Vec, + pub following: Vec, } impl DbInstance { @@ -151,49 +150,54 @@ impl DbInstance { pub fn read_local_view(conn: &Mutex) -> MyResult { let instance = DbInstance::read_local_instance(conn)?; let followers = DbInstance::read_followers(instance.id, conn)?; - let followed = DbInstance::read_followed(instance.id, conn)?; + let following = DbInstance::read_following(instance.id, conn)?; Ok(InstanceView { instance, followers, - followed, + following, }) } pub fn follow( follower_id_: i32, - followed_id_: i32, + instance_id_: i32, pending_: bool, data: &Data, ) -> MyResult<()> { - use instance_follow::dsl::{followed_id, follower_id, pending}; + debug_assert_ne!(follower_id_, instance_id_); + use instance_follow::dsl::{follower_id, instance_id, pending}; let mut conn = data.db_connection.lock().unwrap(); + let form = ( + instance_id.eq(instance_id_), + follower_id.eq(follower_id_), + pending.eq(pending_), + ); + dbg!(follower_id_, instance_id_, pending_); insert_into(instance_follow::table) - .values(( - follower_id.eq(follower_id_), - followed_id.eq(followed_id_), - pending.eq(pending_), - )) + .values(form) + .on_conflict((instance_id, follower_id)) + .do_update() + .set(form) .execute(conn.deref_mut())?; Ok(()) } pub fn read_followers(id_: i32, conn: &Mutex) -> MyResult> { - use instance_follow::dsl::{followed_id, id}; + use instance_follow::dsl::{follower_id, instance_id}; let mut conn = conn.lock().unwrap(); Ok(instance_follow::table - .inner_join(instance::table.on(id.eq(instance::dsl::id))) - .filter(followed_id.eq(id_)) + .inner_join(instance::table.on(follower_id.eq(instance::dsl::id))) + .filter(instance_id.eq(id_)) .select(instance::all_columns) .get_results(conn.deref_mut())?) } - pub fn read_followed(id_: i32, conn: &Mutex) -> MyResult> { - // TODO: is this correct? - use instance_follow::dsl::{follower_id, id}; + pub fn read_following(id_: i32, conn: &Mutex) -> MyResult> { + use instance_follow::dsl::{follower_id, instance_id}; let mut conn = conn.lock().unwrap(); Ok(instance_follow::table - .inner_join(instance::table.on(id.eq(instance::dsl::id))) + .inner_join(instance::table.on(instance_id.eq(instance::dsl::id))) .filter(follower_id.eq(id_)) .select(instance::all_columns) .get_results(conn.deref_mut())?) diff --git a/src/database/mod.rs b/src/database/mod.rs index c29cfd5..cfb5f2b 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -9,11 +9,9 @@ use activitypub_federation::fetch::object_id::ObjectId; use diesel::PgConnection; use diffy::{apply, merge, Patch}; use edit::EditVersion; -use instance::DbInstance; -use std::collections::HashMap; + use std::ops::Deref; use std::sync::{Arc, Mutex}; -use url::Url; pub mod article; pub mod edit; @@ -58,12 +56,10 @@ impl DbConflict { // create common ancestor version let edits = DbEdit::for_article(&original_article, &data.db_connection)?; let ancestor = generate_article_version(&edits, &self.previous_version)?; - dbg!(&ancestor, &self.previous_version); - dbg!(&self.diff); let patch = Patch::from_str(&self.diff)?; // apply self.diff to ancestor to get `ours` - let ours = dbg!(apply(&ancestor, &patch))?; + let ours = apply(&ancestor, &patch)?; match merge(&ancestor, &ours, &original_article.text) { Ok(new_text) => { // patch applies cleanly so we are done diff --git a/src/database/schema.rs b/src/database/schema.rs index 4f6a09e..d1582f9 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -42,8 +42,8 @@ diesel::table! { diesel::table! { instance_follow (id) { id -> Int4, + instance_id -> Int4, follower_id -> Int4, - followed_id -> Int4, pending -> Bool, } } diff --git a/src/federation/activities/accept.rs b/src/federation/activities/accept.rs index eeffd6f..85cc0ae 100644 --- a/src/federation/activities/accept.rs +++ b/src/federation/activities/accept.rs @@ -51,7 +51,7 @@ impl ActivityHandler for Accept { // add to follows let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let actor = self.actor.dereference(data).await?; - DbInstance::follow(local_instance.id, actor.id, false, &data)?; + DbInstance::follow(local_instance.id, actor.id, false, data)?; Ok(()) } } diff --git a/src/federation/activities/create_article.rs b/src/federation/activities/create_article.rs index f03e6dd..d9e53c9 100644 --- a/src/federation/activities/create_article.rs +++ b/src/federation/activities/create_article.rs @@ -30,7 +30,7 @@ impl CreateArticle { let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let object = article.clone().into_json(data).await?; let id = generate_activity_id(local_instance.ap_id.inner())?; - let to = local_instance.follower_ids(&data)?; + let to = local_instance.follower_ids(data)?; let create = CreateArticle { actor: local_instance.ap_id.clone(), to, diff --git a/src/federation/activities/follow.rs b/src/federation/activities/follow.rs index bb90d8d..f38a5dd 100644 --- a/src/federation/activities/follow.rs +++ b/src/federation/activities/follow.rs @@ -60,7 +60,7 @@ impl ActivityHandler for Follow { async fn receive(self, data: &Data) -> Result<(), Self::Error> { let actor = self.actor.dereference(data).await?; let local_instance = DbInstance::read_local_instance(&data.db_connection)?; - DbInstance::follow(actor.id, local_instance.id, false, &data)?; + DbInstance::follow(actor.id, local_instance.id, false, data)?; // send back an accept let follower = self.actor.dereference(data).await?; diff --git a/src/federation/activities/reject.rs b/src/federation/activities/reject.rs index 5327a97..b015ebd 100644 --- a/src/federation/activities/reject.rs +++ b/src/federation/activities/reject.rs @@ -66,7 +66,6 @@ impl ActivityHandler for RejectEdit { } async fn receive(self, data: &Data) -> Result<(), Self::Error> { - dbg!(&self); // cant convert this to DbEdit as it tries to apply patch and fails let mut lock = data.conflicts.lock().unwrap(); let conflict = DbConflict { diff --git a/src/federation/activities/update_local_article.rs b/src/federation/activities/update_local_article.rs index 70f67e4..926dded 100644 --- a/src/federation/activities/update_local_article.rs +++ b/src/federation/activities/update_local_article.rs @@ -37,7 +37,7 @@ impl UpdateLocalArticle { debug_assert!(article.local); let local_instance = DbInstance::read_local_instance(&data.db_connection)?; let id = generate_activity_id(local_instance.ap_id.inner())?; - let mut to = local_instance.follower_ids(&data)?; + let mut to = local_instance.follower_ids(data)?; to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone())); let update = UpdateLocalArticle { actor: local_instance.ap_id.clone(), diff --git a/src/federation/activities/update_remote_article.rs b/src/federation/activities/update_remote_article.rs index 90529a7..e04ca35 100644 --- a/src/federation/activities/update_remote_article.rs +++ b/src/federation/activities/update_remote_article.rs @@ -47,9 +47,6 @@ impl UpdateRemoteArticle { kind: Default::default(), id, }; - // TODO: this is wrong and causes test failure. need to take previous_version from api param, - // or put previous_version in DbEdit - dbg!(&update.object.previous_version); local_instance .send(update, vec![Url::parse(&article_instance.inbox_url)?], data) .await?; diff --git a/src/federation/objects/edits_collection.rs b/src/federation/objects/edits_collection.rs index 02a007f..3afe139 100644 --- a/src/federation/objects/edits_collection.rs +++ b/src/federation/objects/edits_collection.rs @@ -1,4 +1,4 @@ -use crate::database::article::{ArticleView, DbArticle}; +use crate::database::article::DbArticle; use crate::database::MyDataHandle; use crate::error::Error; use crate::federation::objects::edit::ApubEdit; diff --git a/src/federation/objects/instance.rs b/src/federation/objects/instance.rs index e4592d1..002939e 100644 --- a/src/federation/objects/instance.rs +++ b/src/federation/objects/instance.rs @@ -8,12 +8,12 @@ use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, protocol::{public_key::PublicKey, verification::verify_domains_match}, - traits::{ActivityHandler, Actor, Object}, + traits::{Actor, Object}, }; use chrono::{DateTime, Local, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use url::{ParseError, Url}; +use url::Url; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -40,7 +40,7 @@ impl Object for DbInstance { object_id: Url, data: &Data, ) -> Result, Self::Error> { - Ok(DbInstance::read_from_ap_id(&object_id.into(), &data).ok()) + Ok(DbInstance::read_from_ap_id(&object_id.into(), data).ok()) } async fn into_json(self, _data: &Data) -> Result { diff --git a/src/lib.rs b/src/lib.rs index 72e292a..de5495d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,6 @@ use diesel_migrations::MigrationHarness; use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex}; use tracing::info; -use url::Url; pub mod api; pub mod database; diff --git a/src/utils.rs b/src/utils.rs index d8c4042..5c254f7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -29,7 +29,6 @@ pub fn generate_article_version(edits: &Vec, version: &EditVersion) -> M return Ok(generated); } for e in edits { - dbg!(&e); let patch = Patch::from_str(&e.diff)?; generated = apply(&generated, &patch)?; if &e.version == version { diff --git a/tests/common.rs b/tests/common.rs index ab306a1..d6a2bbb 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -167,13 +167,7 @@ pub async fn edit_article_with_conflict( } pub async fn edit_article(hostname: &str, edit_form: &EditArticleData) -> MyResult { - let edit_res: Option = CLIENT - .patch(format!("http://{}/api/v1/article", hostname)) - .form(&edit_form) - .send() - .await? - .json() - .await?; + let edit_res = edit_article_with_conflict(hostname, edit_form).await?; assert!(edit_res.is_none()); get_article(hostname, edit_form.article_id).await } @@ -211,13 +205,13 @@ where .await?) } -pub async fn follow_instance(follow_instance: &str, followed_instance: &str) -> MyResult<()> { +pub async fn follow_instance(api_instance: &str, follow_instance: &str) -> MyResult<()> { // fetch beta instance on alpha let resolve_form = ResolveObject { - id: Url::parse(&format!("http://{}", followed_instance))?, + id: Url::parse(&format!("http://{}", follow_instance))?, }; let instance_resolved: DbInstance = - get_query(followed_instance, "resolve_instance", Some(resolve_form)).await?; + get_query(api_instance, "resolve_instance", Some(resolve_form)).await?; // send follow let follow_form = FollowInstance { @@ -225,7 +219,7 @@ pub async fn follow_instance(follow_instance: &str, followed_instance: &str) -> }; // cant use post helper because follow doesnt return json CLIENT - .post(format!("http://{}/api/v1/instance/follow", follow_instance)) + .post(format!("http://{}/api/v1/instance/follow", api_instance)) .form(&follow_form) .send() .await?; diff --git a/tests/test.rs b/tests/test.rs index ead95d2..afeecdb 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -83,19 +83,29 @@ async fn test_follow_instance() -> MyResult<()> { // check initial state let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?; assert_eq!(0, alpha_instance.followers.len()); - assert_eq!(0, alpha_instance.followed.len()); + assert_eq!(0, alpha_instance.following.len()); let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; assert_eq!(0, beta_instance.followers.len()); - assert_eq!(0, beta_instance.followed.len()); + assert_eq!(0, beta_instance.following.len()); follow_instance(&data.alpha.hostname, &data.beta.hostname).await?; // check that follow was federated - let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; - assert_eq!(1, beta_instance.followers.len()); - let alpha_instance: InstanceView = get(&data.alpha.hostname, "instance").await?; - assert_eq!(1, alpha_instance.followed.len()); + assert_eq!(1, alpha_instance.following.len()); + assert_eq!(0, alpha_instance.followers.len()); + assert_eq!( + beta_instance.instance.ap_id, + alpha_instance.following[0].ap_id + ); + + let beta_instance: InstanceView = get(&data.beta.hostname, "instance").await?; + assert_eq!(0, beta_instance.following.len()); + assert_eq!(1, beta_instance.followers.len()); + assert_eq!( + alpha_instance.instance.ap_id, + beta_instance.followers[0].ap_id + ); data.stop() } @@ -157,7 +167,6 @@ async fn test_edit_local_article() -> MyResult<()> { let create_res = create_article(&data.beta.hostname, title.clone()).await?; assert_eq!(title, create_res.article.title); assert!(create_res.article.local); - dbg!(1); // article should be federated to alpha let get_res = get_article(&data.alpha.hostname, create_res.article.id).await?; @@ -165,7 +174,6 @@ async fn test_edit_local_article() -> MyResult<()> { assert_eq!(1, get_res.edits.len()); assert!(!get_res.article.local); assert_eq!(create_res.article.text, get_res.article.text); - dbg!(2); // edit the article let edit_form = EditArticleData { @@ -175,7 +183,6 @@ async fn test_edit_local_article() -> MyResult<()> { resolve_conflict_id: None, }; let edit_res = edit_article(&data.beta.hostname, &edit_form).await?; - dbg!(3); assert_eq!(edit_res.article.text, edit_form.new_text); assert_eq!(edit_res.edits.len(), 2); assert!(edit_res.edits[0] @@ -185,7 +192,6 @@ async fn test_edit_local_article() -> MyResult<()> { // edit should be federated to alpha let get_res = get_article(&data.alpha.hostname, edit_res.article.id).await?; - dbg!(4); assert_eq!(edit_res.article.title, get_res.article.title); assert_eq!(edit_res.edits.len(), 2); assert_eq!(edit_res.article.text, get_res.article.text); @@ -341,20 +347,17 @@ async fn test_federated_edit_conflict() -> MyResult<()> { // gamma also edits, as its not the latest version there is a conflict. local version should // not be updated with this conflicting version, instead user needs to handle the conflict - dbg!(&create_res.article.text, &create_res.latest_version); let edit_form = EditArticleData { article_id: create_res.article.id, new_text: "aaaa\n".to_string(), previous_version: create_res.latest_version, resolve_conflict_id: None, }; - dbg!(1); let edit_res = edit_article(&data.gamma.hostname, &edit_form).await?; assert_ne!(edit_form.new_text, edit_res.article.text); assert_eq!(2, edit_res.edits.len()); assert!(!edit_res.article.local); - dbg!(2); let conflicts: Vec = get_query(&data.gamma.hostname, "edit_conflicts", None::<()>).await?; assert_eq!(1, conflicts.len()); @@ -366,16 +369,13 @@ async fn test_federated_edit_conflict() -> MyResult<()> { previous_version: conflicts[0].previous_version.clone(), resolve_conflict_id: Some(conflicts[0].id), }; - dbg!(3); let edit_res = edit_article(&data.gamma.hostname, &edit_form).await?; assert_eq!(edit_form.new_text, edit_res.article.text); assert_eq!(3, edit_res.edits.len()); - dbg!(4); let conflicts: Vec = get_query(&data.gamma.hostname, "edit_conflicts", None::<()>).await?; assert_eq!(0, conflicts.len()); - dbg!(5); data.stop() }