some review comments

This commit is contained in:
phiresky 2023-08-03 11:46:31 +00:00
parent ef60dc0560
commit 7f82bd07fe
8 changed files with 45 additions and 79 deletions

1
Cargo.lock generated
View file

@ -2823,7 +2823,6 @@ dependencies = [
"bytes", "bytes",
"chrono", "chrono",
"clap", "clap",
"dashmap",
"diesel", "diesel",
"diesel-async", "diesel-async",
"enum_delegate", "enum_delegate",

View file

@ -98,8 +98,7 @@ impl BlockUser {
match target { match target {
SiteOrCommunity::Site(_) => { SiteOrCommunity::Site(_) => {
let mut inboxes = ActivitySendTargets::empty(); let inboxes = ActivitySendTargets::to_all_instances();
inboxes.set_all_instances(true);
send_lemmy_activity(context, block, mod_, inboxes, false).await send_lemmy_activity(context, block, mod_, inboxes, false).await
} }
SiteOrCommunity::Community(c) => { SiteOrCommunity::Community(c) => {

View file

@ -63,7 +63,7 @@ impl UndoBlockUser {
let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox());
match target { match target {
SiteOrCommunity::Site(_) => { SiteOrCommunity::Site(_) => {
inboxes.set_all_instances(true); inboxes.set_all_instances();
send_lemmy_activity(context, undo, mod_, inboxes, false).await send_lemmy_activity(context, undo, mod_, inboxes, false).await
} }
SiteOrCommunity::Community(c) => { SiteOrCommunity::Community(c) => {

View file

@ -38,8 +38,7 @@ pub async fn delete_user(person: Person, context: Data<LemmyContext>) -> Result<
cc: vec![], cc: vec![],
}; };
let mut inboxes = ActivitySendTargets::empty(); let inboxes = ActivitySendTargets::to_all_instances();
inboxes.set_all_instances(true);
send_lemmy_activity(&context, delete, &actor, inboxes, true).await?; send_lemmy_activity(&context, delete, &actor, inboxes, true).await?;
Ok(()) Ok(())

View file

@ -48,14 +48,16 @@ impl ActivitySendTargets {
} }
pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets { pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets {
let mut a = ActivitySendTargets::empty(); let mut a = ActivitySendTargets::empty();
a.add_local_community_followers(id); a.community_followers_of.insert(id);
a a
} }
pub fn add_local_community_followers(&mut self, id: CommunityId) { pub fn to_all_instances() -> ActivitySendTargets {
self.community_followers_of.insert(id); let mut a = ActivitySendTargets::empty();
a.all_instances = true;
a
} }
pub fn set_all_instances(&mut self, b: bool) { pub fn set_all_instances(&mut self) {
self.all_instances = b; self.all_instances = true;
} }
pub fn add_inbox(&mut self, inbox: Url) { pub fn add_inbox(&mut self, inbox: Url) {

View file

@ -11,31 +11,32 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
activitypub_federation.workspace = true
anyhow.workspace = true
async-trait = "0.1.71"
bytes = "1.4.0"
chrono.workspace = true
clap = { version = "4.3.19", features = ["derive"] }
dashmap = "5.5.0"
diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] }
diesel-async = { workspace = true, features = ["deadpool", "postgres"] }
enum_delegate = "0.2.0"
futures.workspace = true
lemmy_api_common.workspace = true lemmy_api_common.workspace = true
lemmy_apub.workspace = true lemmy_apub.workspace = true
lemmy_db_schema = { workspace = true, features = ["full"] } lemmy_db_schema = { workspace = true, features = ["full"] }
lemmy_db_views_actor.workspace = true lemmy_db_views_actor.workspace = true
lemmy_utils.workspace = true lemmy_utils.workspace = true
moka = { version = "0.11.2", features = ["future"] }
activitypub_federation.workspace = true
anyhow.workspace = true
futures.workspace = true
chrono.workspace = true
diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] }
diesel-async = { workspace = true, features = ["deadpool", "postgres"] }
once_cell.workspace = true once_cell.workspace = true
openssl = "0.10.55"
reqwest.workspace = true reqwest.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
async-trait = "0.1.71"
bytes = "1.4.0"
clap = { version = "4.3.19", features = ["derive"] }
enum_delegate = "0.2.0"
moka = { version = "0.11.2", features = ["future"] }
openssl = "0.10.55"
reqwest-middleware = "0.2.2" reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.5" reqwest-tracing = "0.4.5"
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = "0.7.8" tokio-util = "0.7.8"
tracing.workspace = true
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"

View file

@ -1,5 +1,4 @@
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use dashmap::DashSet;
use diesel::{prelude::*, sql_types::Int8}; use diesel::{prelude::*, sql_types::Int8};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use lemmy_apub::{ use lemmy_apub::{
@ -20,13 +19,7 @@ use moka::future::Cache;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::Url; use reqwest::Url;
use serde_json::Value; use serde_json::Value;
use std::{ use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
borrow::{Borrow, Cow},
future::Future,
pin::Pin,
sync::Arc,
time::Duration,
};
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -81,7 +74,7 @@ impl<R: Send + 'static> CancellableTask<R> {
/// assuming apub priv key and ids are immutable, then we don't need to have TTL /// assuming apub priv key and ids are immutable, then we don't need to have TTL
/// TODO: capacity should be configurable maybe based on memory use /// TODO: capacity should be configurable maybe based on memory use
pub async fn get_actor_cached( pub(crate) async fn get_actor_cached(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
actor_type: ActorType, actor_type: ActorType,
actor_apub_id: &Url, actor_apub_id: &Url,
@ -117,30 +110,15 @@ pub async fn get_actor_cached(
.map_err(|e| anyhow::anyhow!("err getting actor: {e:?}")) .map_err(|e| anyhow::anyhow!("err getting actor: {e:?}"))
} }
/// intern urls to reduce memory usage
/// not sure if worth it
pub fn intern_url<'a>(url: impl Into<Cow<'a, Url>>) -> Arc<Url> {
static INTERNED_URLS: Lazy<DashSet<Arc<Url>>> = Lazy::new(DashSet::new);
let url: Cow<'a, Url> = url.into();
return INTERNED_URLS
.get::<Url>(url.borrow())
.map(|e| e.clone())
.unwrap_or_else(|| {
let ret = Arc::new(url.into_owned());
INTERNED_URLS.insert(ret.clone());
ret
});
}
/// this should maybe be a newtype like all the other PersonId CommunityId etc. /// this should maybe be a newtype like all the other PersonId CommunityId etc.
pub type ActivityId = i64; pub(crate) type ActivityId = i64;
type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>; type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>;
/// activities are immutable so cache does not need to have TTL /// activities are immutable so cache does not need to have TTL
/// May return None if the corresponding id does not exist or is a received activity. /// May return None if the corresponding id does not exist or is a received activity.
/// Holes in serials are expected behaviour in postgresql /// Holes in serials are expected behaviour in postgresql
/// todo: cache size should probably be configurable / dependent on desired memory usage /// todo: cache size should probably be configurable / dependent on desired memory usage
pub async fn get_activity_cached( pub(crate) async fn get_activity_cached(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
activity_id: ActivityId, activity_id: ActivityId,
) -> Result<CachedActivityInfo> { ) -> Result<CachedActivityInfo> {
@ -165,7 +143,7 @@ pub async fn get_activity_cached(
} }
/// return the most current activity id (with 1 second cache) /// return the most current activity id (with 1 second cache)
pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> { pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| { static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
Cache::builder() Cache::builder()
.time_to_live(Duration::from_secs(1)) .time_to_live(Duration::from_secs(1))
@ -186,7 +164,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId>
} }
/// how long to sleep based on how many retries have already happened /// how long to sleep based on how many retries have already happened
pub fn retry_sleep_duration(retry_count: i32) -> Duration { pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
} }

View file

@ -1,12 +1,6 @@
use crate::{ use crate::{
federation_queue_state::FederationQueueState, federation_queue_state::FederationQueueState,
util::{ util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration},
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
intern_url,
retry_sleep_duration,
},
}; };
use activitypub_federation::{ use activitypub_federation::{
activity_queue::{prepare_raw, send_raw, sign_raw}, activity_queue::{prepare_raw, send_raw, sign_raw},
@ -23,10 +17,7 @@ use lemmy_db_views_actor::structs::CommunityFollowerView;
use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT};
use reqwest::Url; use reqwest::Url;
use std::{ use std::{
borrow::Cow,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
ops::Deref,
sync::Arc,
time::Duration, time::Duration,
}; };
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
@ -48,7 +39,7 @@ pub async fn instance_worker(
let mut last_full_communities_fetch = Utc.timestamp_nanos(0); let mut last_full_communities_fetch = Utc.timestamp_nanos(0);
let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0);
let mut last_state_insert = Utc.timestamp_nanos(0); let mut last_state_insert = Utc.timestamp_nanos(0);
let mut followed_communities: HashMap<CommunityId, HashSet<Arc<Url>>> = let mut followed_communities: HashMap<CommunityId, HashSet<Url>> =
get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?; get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?;
let site = Site::read_from_instance_id(pool, instance.id).await?; let site = Site::read_from_instance_id(pool, instance.id).await?;
@ -94,7 +85,7 @@ pub async fn instance_worker(
}; };
let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?;
let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); let inbox_urls = inbox_urls.into_iter().collect();
let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data)
.await .await
.into_anyhow()?; .into_anyhow()?;
@ -163,15 +154,15 @@ pub async fn instance_worker(
fn get_inbox_urls( fn get_inbox_urls(
instance: &Instance, instance: &Instance,
site: &Option<Site>, site: &Option<Site>,
followed_communities: &HashMap<CommunityId, HashSet<Arc<Url>>>, followed_communities: &HashMap<CommunityId, HashSet<Url>>,
activity: &SentActivity, activity: &SentActivity,
) -> HashSet<Arc<Url>> { ) -> HashSet<Url> {
let mut inbox_urls = HashSet::new(); let mut inbox_urls: HashSet<Url> = HashSet::new();
if activity.send_all_instances { if activity.send_all_instances {
if let Some(site) = &site { if let Some(site) = &site {
// todo: when does an instance not have a site? // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine.
inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref()))); inbox_urls.insert(site.inbox_url.inner().clone());
} }
} }
for t in &activity.send_community_followers_of { for t in &activity.send_community_followers_of {
@ -183,7 +174,7 @@ fn get_inbox_urls(
if inbox.domain() != Some(&instance.domain) { if inbox.domain() != Some(&instance.domain) {
continue; continue;
} }
inbox_urls.insert(intern_url(Cow::Borrowed(inbox.inner()))); inbox_urls.insert(inbox.inner().clone());
} }
inbox_urls inbox_urls
} }
@ -193,7 +184,7 @@ async fn get_communities(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: &mut DateTime<Utc>, last_fetch: &mut DateTime<Utc>,
) -> Result<HashMap<CommunityId, HashSet<Arc<Url>>>> { ) -> Result<HashMap<CommunityId, HashSet<Url>>> {
let e = *last_fetch; let e = *last_fetch;
*last_fetch = Utc::now(); // update to time before fetch to ensure overlap *last_fetch = Utc::now(); // update to time before fetch to ensure overlap
Ok( Ok(
@ -201,10 +192,7 @@ async fn get_communities(
.await? .await?
.into_iter() .into_iter()
.fold(HashMap::new(), |mut map, (c, u)| { .fold(HashMap::new(), |mut map, (c, u)| {
map map.entry(c).or_insert_with(HashSet::new).insert(u.into());
.entry(c)
.or_insert_with(HashSet::new)
.insert(intern_url(Cow::Owned(u.into())));
map map
}), }),
) )