From bdbb499c9d916a3278fc7ef46a5d91db71fafdf8 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 10 Aug 2023 16:23:24 +0000 Subject: [PATCH] address review comments mostly --- crates/apub/src/activities/mod.rs | 6 +-- crates/db_schema/src/impls/activity.rs | 9 ++--- crates/db_schema/src/schema.rs | 8 ++-- crates/db_schema/src/source/activity.rs | 38 +++---------------- crates/federate/src/federation_queue_state.rs | 4 +- crates/federate/src/worker.rs | 15 ++++---- .../up.sql | 5 ++- 7 files changed, 27 insertions(+), 58 deletions(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 7ae99c831..5641e322f 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -211,11 +211,7 @@ where .map(|e| Some(e.into())) .collect(), send_all_instances: send_targets.all_instances, - send_community_followers_of: send_targets - .community_followers_of - .into_iter() - .map(|e| Some(e.0)) - .collect(), + send_community_followers_of: send_targets.community_followers_of.map(|e| e.0), actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 329f20e7a..16b0fca40 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -31,12 +31,9 @@ impl SentActivity { .await } pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result { - use crate::schema::sent_activity::dsl::{id, sent_activity}; + use crate::schema::sent_activity::dsl::sent_activity; let conn = &mut get_conn(pool).await?; - sent_activity - .filter(id.eq(object_id)) - .first::(conn) - .await + sent_activity.find(object_id).first::(conn).await } } @@ -115,7 +112,7 @@ mod tests { .into(), actor_type: ActorType::Person, send_all_instances: false, - send_community_followers_of: vec![], + send_community_followers_of: None, send_inboxes: vec![], }; diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index c69b003a5..66502d943 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -296,8 +296,10 @@ diesel::table! { } diesel::table! { - federation_queue_state (domain) { - domain -> Text, + federation_queue_state (id) { + id -> Int4, + #[max_length = 255] + domain -> Varchar, last_successful_id -> Int8, fail_count -> Int4, last_retry -> Timestamptz, @@ -806,7 +808,7 @@ diesel::table! { sensitive -> Bool, published -> Timestamp, send_inboxes -> Array>, - send_community_followers_of -> Array>, + send_community_followers_of -> Nullable, send_all_instances -> Bool, actor_type -> ActorTypeEnum, actor_apub_id -> Nullable, diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index 57881877f..f2a4058c0 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -31,7 +31,7 @@ pub struct ActivitySendTargets { /// send to these inboxes explicitly pub inboxes: HashSet, /// send to all followers of these local communities - pub community_followers_of: HashSet, + pub community_followers_of: Option, /// send to all remote instances pub all_instances: bool, } @@ -48,7 +48,7 @@ impl ActivitySendTargets { } pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets { let mut a = ActivitySendTargets::empty(); - a.community_followers_of.insert(id); + a.community_followers_of = Some(id); a } pub fn to_all_instances() -> ActivitySendTargets { @@ -76,41 +76,13 @@ pub struct SentActivity { pub data: Value, pub sensitive: bool, pub published: chrono::NaiveDateTime, - #[diesel(deserialize_as = ArrayToHashSet)] - pub send_inboxes: HashSet, - #[diesel(deserialize_as = ArrayToHashSet)] - pub send_community_followers_of: HashSet, + pub send_inboxes: Vec>, + pub send_community_followers_of: Option, pub send_all_instances: bool, pub actor_type: ActorType, pub actor_apub_id: Option, } -// wrapper to remove optional from array values and convert to hashset -pub struct ArrayToHashSet(HashSet); - -impl Queryable>, DB> for ArrayToHashSet -where - DB: Backend, - T1: FromSql + Hash + Eq, - Vec>: FromSql>, DB>, -{ - type Row = Vec>; - - fn build(row: Self::Row) -> diesel::deserialize::Result { - let res: diesel::deserialize::Result> = row - .into_iter() - .map(|e| e.ok_or("array with null element".into())) - .collect(); - res.map(ArrayToHashSet) - } -} - -impl From> for HashSet { - fn from(val: ArrayToHashSet) -> Self { - val.0 - } -} - #[derive(Insertable)] #[diesel(table_name = sent_activity)] pub struct SentActivityForm { @@ -118,7 +90,7 @@ pub struct SentActivityForm { pub data: Value, pub sensitive: bool, pub send_inboxes: Vec>, - pub send_community_followers_of: Vec>, + pub send_community_followers_of: Option, pub send_all_instances: bool, pub actor_type: ActorType, pub actor_apub_id: DbUrl, diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs index cf6a33a81..80708fa9e 100644 --- a/crates/federate/src/federation_queue_state.rs +++ b/crates/federate/src/federation_queue_state.rs @@ -19,11 +19,11 @@ pub struct FederationQueueState { impl FederationQueueState { /// load or return a default empty value pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result { - use lemmy_db_schema::schema::federation_queue_state::dsl::federation_queue_state; + use lemmy_db_schema::schema::federation_queue_state::dsl::{domain, federation_queue_state}; let conn = &mut get_conn(pool).await?; Ok( federation_queue_state - .find(&domain_) + .filter(domain.eq(&domain_)) .select(FederationQueueState::as_select()) .get_result(conn) .await diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 4d0b955ef..f8aa47ce7 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -201,17 +201,18 @@ impl InstanceWorker { inbox_urls.insert(site.inbox_url.inner().clone()); } } - for t in &activity.send_community_followers_of { + if let Some(t) = &activity.send_community_followers_of { if let Some(urls) = self.followed_communities.get(t) { inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); } } - for inbox in &activity.send_inboxes { - if inbox.domain() != Some(&self.instance.domain) { - continue; - } - inbox_urls.insert(inbox.inner().clone()); - } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(|e| e.as_ref()) + .filter_map(|u| (u.domain() == Some(&self.instance.domain)).then(|| u.inner().clone())), + ); inbox_urls } diff --git a/migrations/2023-08-01-115243_persistent-activity-queue/up.sql b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql index 0385fc830..9a9316af2 100644 --- a/migrations/2023-08-01-115243_persistent-activity-queue/up.sql +++ b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql @@ -7,7 +7,7 @@ CREATE TYPE actor_type_enum AS enum( -- actor_apub_id only null for old entries before this migration ALTER TABLE sent_activity ADD COLUMN send_inboxes text[] NOT NULL DEFAULT '{}', -- list of specific inbox urls - ADD COLUMN send_community_followers_of integer[] NOT NULL DEFAULT '{}', + ADD COLUMN send_community_followers_of integer DEFAULT NULL, ADD COLUMN send_all_instances boolean NOT NULL DEFAULT FALSE, ADD COLUMN actor_type actor_type_enum NOT NULL DEFAULT 'person', ADD COLUMN actor_apub_id text DEFAULT NULL; @@ -20,7 +20,8 @@ ALTER TABLE sent_activity ALTER COLUMN actor_apub_id DROP DEFAULT; CREATE TABLE federation_queue_state( - domain text PRIMARY KEY, + id serial PRIMARY KEY, + domain varchar(255) NOT NULL UNIQUE, last_successful_id bigint NOT NULL, fail_count integer NOT NULL, last_retry timestamptz NOT NULL