Merge branch 'main' into migration-runner

This commit is contained in:
dullbananas 2024-05-23 06:45:47 -07:00 committed by GitHub
commit 833a25012c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 224 additions and 137 deletions

View file

@ -3,3 +3,5 @@ edition = "2021"
imports_layout = "HorizontalVertical" imports_layout = "HorizontalVertical"
imports_granularity = "Crate" imports_granularity = "Crate"
group_imports = "One" group_imports = "One"
wrap_comments = true
comment_width = 100

View file

@ -47,7 +47,8 @@
# #
# To be removed in 0.20 # To be removed in 0.20
cache_external_link_previews: true cache_external_link_previews: true
# Specifies how to handle remote images, so that users don't have to connect directly to remote servers. # Specifies how to handle remote images, so that users don't have to connect directly to remote
# servers.
image_mode: image_mode:
# Leave images unchanged, don't generate any local thumbnails for post urls. Instead the # Leave images unchanged, don't generate any local thumbnails for post urls. Instead the
# Opengraph image is directly returned as thumbnail # Opengraph image is directly returned as thumbnail
@ -64,10 +65,11 @@
# or # or
# If enabled, all images from remote domains are rewritten to pass through `/api/v3/image_proxy`, # If enabled, all images from remote domains are rewritten to pass through
# including embedded images in markdown. Images are stored temporarily in pict-rs for caching. # `/api/v3/image_proxy`, including embedded images in markdown. Images are stored temporarily
# This improves privacy as users don't expose their IP to untrusted servers, and decreases load # in pict-rs for caching. This improves privacy as users don't expose their IP to untrusted
# on other servers. However it increases bandwidth use for the local server. # servers, and decreases load on other servers. However it increases bandwidth use for the
# local server.
# #
# Requires pict-rs 0.5 # Requires pict-rs 0.5
"ProxyAllImages" "ProxyAllImages"

View file

@ -72,7 +72,8 @@ pub async fn save_user_settings(
} }
} }
// When the site requires email, make sure email is not Some(None). IE, an overwrite to a None value // When the site requires email, make sure email is not Some(None). IE, an overwrite to a None
// value
if let Some(email) = &email { if let Some(email) = &email {
if email.is_none() && site_view.local_site.require_email_verification { if email.is_none() && site_view.local_site.require_email_verification {
Err(LemmyErrorType::EmailRequired)? Err(LemmyErrorType::EmailRequired)?

View file

@ -121,7 +121,8 @@ pub async fn send_local_notifs(
if let Ok(Some(mention_user_view)) = user_view { if let Ok(Some(mention_user_view)) = user_view {
// TODO // TODO
// At some point, make it so you can't tag the parent creator either // At some point, make it so you can't tag the parent creator either
// Potential duplication of notifications, one for reply and the other for mention, is handled below by checking recipient ids // Potential duplication of notifications, one for reply and the other for mention, is handled
// below by checking recipient ids
recipient_ids.push(mention_user_view.local_user.id); recipient_ids.push(mention_user_view.local_user.id);
let user_mention_form = PersonMentionInsertForm { let user_mention_form = PersonMentionInsertForm {

View file

@ -124,7 +124,8 @@ pub struct SaveUserSettings {
pub post_listing_mode: Option<PostListingMode>, pub post_listing_mode: Option<PostListingMode>,
/// Whether to allow keyboard navigation (for browsing and interacting with posts and comments). /// Whether to allow keyboard navigation (for browsing and interacting with posts and comments).
pub enable_keyboard_navigation: Option<bool>, pub enable_keyboard_navigation: Option<bool>,
/// Whether user avatars or inline images in the UI that are gifs should be allowed to play or should be paused /// Whether user avatars or inline images in the UI that are gifs should be allowed to play or
/// should be paused
pub enable_animated_images: Option<bool>, pub enable_animated_images: Option<bool>,
/// Whether to auto-collapse bot comments. /// Whether to auto-collapse bot comments.
pub collapse_bot_comments: Option<bool>, pub collapse_bot_comments: Option<bool>,
@ -151,7 +152,8 @@ pub struct ChangePassword {
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]
/// A response for your login. /// A response for your login.
pub struct LoginResponse { pub struct LoginResponse {
/// This is None in response to `Register` if email verification is enabled, or the server requires registration applications. /// This is None in response to `Register` if email verification is enabled, or the server
/// requires registration applications.
pub jwt: Option<SensitiveString>, pub jwt: Option<SensitiveString>,
/// If registration applications are required, this will return true for a signup response. /// If registration applications are required, this will return true for a signup response.
pub registration_created: bool, pub registration_created: bool,

View file

@ -375,7 +375,8 @@ impl From<FederationQueueState> for ReadableFederationState {
pub struct InstanceWithFederationState { pub struct InstanceWithFederationState {
#[serde(flatten)] #[serde(flatten)]
pub instance: Instance, pub instance: Instance,
/// if federation to this instance is or was active, show state of outgoing federation to this instance /// if federation to this instance is or was active, show state of outgoing federation to this
/// instance
pub federation_state: Option<ReadableFederationState>, pub federation_state: Option<ReadableFederationState>,
} }

View file

@ -356,7 +356,8 @@ pub async fn build_federated_instances(
federation_state: federation_state.map(std::convert::Into::into), federation_state: federation_state.map(std::convert::Into::into),
}; };
if is_blocked { if is_blocked {
// blocked instances will only have an entry here if they had been federated with in the past. // blocked instances will only have an entry here if they had been federated with in the
// past.
blocked.push(i); blocked.push(i);
} else if is_allowed { } else if is_allowed {
allowed.push(i.clone()); allowed.push(i.clone());
@ -954,8 +955,8 @@ pub async fn process_markdown_opt(
/// A wrapper for `proxy_image_link` for use in tests. /// A wrapper for `proxy_image_link` for use in tests.
/// ///
/// The parameter `force_image_proxy` is the config value of `pictrs.image_proxy`. Its necessary to pass /// The parameter `force_image_proxy` is the config value of `pictrs.image_proxy`. Its necessary to
/// as separate parameter so it can be changed in tests. /// pass as separate parameter so it can be changed in tests.
async fn proxy_image_link_internal( async fn proxy_image_link_internal(
link: Url, link: Url,
image_mode: PictrsImageMode, image_mode: PictrsImageMode,

View file

@ -156,7 +156,8 @@ pub async fn update_site(
// TODO can't think of a better way to do this. // TODO can't think of a better way to do this.
// If the server suddenly requires email verification, or required applications, no old users // If the server suddenly requires email verification, or required applications, no old users
// will be able to log in. It really only wants this to be a requirement for NEW signups. // will be able to log in. It really only wants this to be a requirement for NEW signups.
// So if it was set from false, to true, you need to update all current users columns to be verified. // So if it was set from false, to true, you need to update all current users columns to be
// verified.
let old_require_application = let old_require_application =
local_site.registration_mode == RegistrationMode::RequireApplication; local_site.registration_mode == RegistrationMode::RequireApplication;

View file

@ -197,7 +197,8 @@ pub async fn register(
verify_email_sent: false, verify_email_sent: false,
}; };
// Log the user in directly if the site is not setup, or email verification and application aren't required // Log the user in directly if the site is not setup, or email verification and application aren't
// required
if !local_site.site_setup if !local_site.site_setup
|| (!require_registration_application && !local_site.require_email_verification) || (!require_registration_application && !local_site.require_email_verification)
{ {

View file

@ -138,8 +138,8 @@ impl ActivityHandler for CollectionAdd {
.dereference(context) .dereference(context)
.await?; .await?;
// If we had to refetch the community while parsing the activity, then the new mod has already // If we had to refetch the community while parsing the activity, then the new mod has
// been added. Skip it here as it would result in a duplicate key error. // already been added. Skip it here as it would result in a duplicate key error.
let new_mod_id = new_mod.id; let new_mod_id = new_mod.id;
let moderated_communities = let moderated_communities =
CommunityModerator::get_person_moderated_communities(&mut context.pool(), new_mod_id) CommunityModerator::get_person_moderated_communities(&mut context.pool(), new_mod_id)

View file

@ -24,13 +24,14 @@ pub mod update;
/// ///
/// Activities are sent to the community itself if it lives on another instance. If the community /// Activities are sent to the community itself if it lives on another instance. If the community
/// is local, the activity is directly wrapped into Announce and sent to community followers. /// is local, the activity is directly wrapped into Announce and sent to community followers.
/// Activities are also sent to those who follow the actor (with exception of moderation activities). /// Activities are also sent to those who follow the actor (with exception of moderation
/// activities).
/// ///
/// * `activity` - The activity which is being sent /// * `activity` - The activity which is being sent
/// * `actor` - The user who is sending the activity /// * `actor` - The user who is sending the activity
/// * `community` - Community inside which the activity is sent /// * `community` - Community inside which the activity is sent
/// * `inboxes` - Any additional inboxes the activity should be sent to (for example, /// * `inboxes` - Any additional inboxes the activity should be sent to (for example, to the user
/// to the user who is being promoted to moderator) /// who is being promoted to moderator)
/// * `is_mod_activity` - True for things like Add/Mod, these are not sent to user followers /// * `is_mod_activity` - True for things like Add/Mod, these are not sent to user followers
pub(crate) async fn send_activity_in_community( pub(crate) async fn send_activity_in_community(
activity: AnnouncableActivities, activity: AnnouncableActivities,

View file

@ -176,7 +176,8 @@ impl ActivityHandler for CreateOrUpdateNote {
// Although mentions could be gotten from the post tags (they are included there), or the ccs, // Although mentions could be gotten from the post tags (they are included there), or the ccs,
// Its much easier to scrape them from the comment body, since the API has to do that // Its much easier to scrape them from the comment body, since the API has to do that
// anyway. // anyway.
// TODO: for compatibility with other projects, it would be much better to read this from cc or tags // TODO: for compatibility with other projects, it would be much better to read this from cc or
// tags
let mentions = scrape_text_for_mentions(&comment.content); let mentions = scrape_text_for_mentions(&comment.content);
send_local_notifs(mentions, comment.id, &actor, do_send_email, context).await?; send_local_notifs(mentions, comment.id, &actor, do_send_email, context).await?;
Ok(()) Ok(())

View file

@ -72,7 +72,8 @@ impl Collection for ApubCommunityFeatured {
.to_vec(); .to_vec();
} }
// process items in parallel, to avoid long delay from fetch_site_metadata() and other processing // process items in parallel, to avoid long delay from fetch_site_metadata() and other
// processing
let stickied_posts: Vec<Post> = join_all(pages.into_iter().map(|page| { let stickied_posts: Vec<Post> = join_all(pages.into_iter().map(|page| {
async { async {
// use separate request counter for each item, otherwise there will be problems with // use separate request counter for each item, otherwise there will be problems with

View file

@ -102,7 +102,8 @@ impl Collection for ApubCommunityOutbox {
// We intentionally ignore errors here. This is because the outbox might contain posts from old // We intentionally ignore errors here. This is because the outbox might contain posts from old
// Lemmy versions, or from other software which we cant parse. In that case, we simply skip the // Lemmy versions, or from other software which we cant parse. In that case, we simply skip the
// item and only parse the ones that work. // item and only parse the ones that work.
// process items in parallel, to avoid long delay from fetch_site_metadata() and other processing // process items in parallel, to avoid long delay from fetch_site_metadata() and other
// processing
join_all(outbox_activities.into_iter().map(|activity| { join_all(outbox_activities.into_iter().map(|activity| {
async { async {
// Receiving announce requires at least one local community follower for anti spam purposes. // Receiving announce requires at least one local community follower for anti spam purposes.

View file

@ -259,7 +259,8 @@ impl Object for ApubPost {
let post_ = post.clone(); let post_ = post.clone();
let context_ = context.reset_request_count(); let context_ = context.reset_request_count();
// Generates a post thumbnail in background task, because some sites can be very slow to respond. // Generates a post thumbnail in background task, because some sites can be very slow to
// respond.
spawn_try_task(async move { spawn_try_task(async move {
generate_post_link_metadata(post_, None, |_| None, local_site, context_).await generate_post_link_metadata(post_, None, |_| None, local_site, context_).await
}); });

View file

@ -23,6 +23,7 @@ pub struct DeleteUser {
#[serde(deserialize_with = "deserialize_one_or_many", default)] #[serde(deserialize_with = "deserialize_one_or_many", default)]
#[serde(skip_serializing_if = "Vec::is_empty")] #[serde(skip_serializing_if = "Vec::is_empty")]
pub(crate) cc: Vec<Url>, pub(crate) cc: Vec<Url>,
/// Nonstandard field. If present, all content from the user should be deleted along with the account /// Nonstandard field. If present, all content from the user should be deleted along with the
/// account
pub(crate) remove_data: Option<bool>, pub(crate) remove_data: Option<bool>,
} }

View file

@ -132,7 +132,8 @@ async fn try_main() -> LemmyResult<()> {
// Make sure the println above shows the correct amount // Make sure the println above shows the correct amount
assert_eq!(num_inserted_posts, num_posts as usize); assert_eq!(num_inserted_posts, num_posts as usize);
// Manually trigger and wait for a statistics update to ensure consistent and high amount of accuracy in the statistics used for query planning // Manually trigger and wait for a statistics update to ensure consistent and high amount of
// accuracy in the statistics used for query planning
println!("🧮 updating database statistics"); println!("🧮 updating database statistics");
conn.batch_execute("ANALYZE;").await?; conn.batch_execute("ANALYZE;").await?;

View file

@ -14,10 +14,12 @@ use diesel::{
/// Gererates a series of rows for insertion. /// Gererates a series of rows for insertion.
/// ///
/// An inclusive range is created from `start` and `stop`. A row for each number is generated using `selection`, which can be a tuple. /// An inclusive range is created from `start` and `stop`. A row for each number is generated using
/// [`current_value`] is an expression that gets the current value. /// `selection`, which can be a tuple. [`current_value`] is an expression that gets the current
/// value.
/// ///
/// For example, if there's a `numbers` table with a `number` column, this inserts all numbers from 1 to 10 in a single statement: /// For example, if there's a `numbers` table with a `number` column, this inserts all numbers from
/// 1 to 10 in a single statement:
/// ///
/// ``` /// ```
/// dsl::insert_into(numbers::table) /// dsl::insert_into(numbers::table)

View file

@ -141,7 +141,8 @@ impl Community {
Ok(community_) Ok(community_)
} }
/// Get the community which has a given moderators or featured url, also return the collection type /// Get the community which has a given moderators or featured url, also return the collection
/// type
pub async fn get_by_collection_url( pub async fn get_by_collection_url(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
url: &DbUrl, url: &DbUrl,

View file

@ -117,15 +117,15 @@ impl Instance {
.await .await
} }
/// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not /// returns a list of all instances, each with a flag of whether the instance is allowed or not
/// ordered by id /// and dead or not ordered by id
pub async fn read_federated_with_blocked_and_dead( pub async fn read_federated_with_blocked_and_dead(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<Vec<(Self, bool, bool)>, Error> { ) -> Result<Vec<(Self, bool, bool)>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
let is_dead_expr = coalesce(instance::updated, instance::published).lt(now() - 3.days()); let is_dead_expr = coalesce(instance::updated, instance::published).lt(now() - 3.days());
// this needs to be done in two steps because the meaning of the "blocked" column depends on the existence // this needs to be done in two steps because the meaning of the "blocked" column depends on the
// of any value at all in the allowlist. (so a normal join wouldn't work) // existence of any value at all in the allowlist. (so a normal join wouldn't work)
let use_allowlist = federation_allowlist::table let use_allowlist = federation_allowlist::table
.select(count_star().gt(0)) .select(count_star().gt(0))
.get_result::<bool>(conn) .get_result::<bool>(conn)

View file

@ -55,7 +55,8 @@ impl Crud for Person {
impl Person { impl Person {
/// Update or insert the person. /// Update or insert the person.
/// ///
/// This is necessary for federation, because Activitypub doesn't distinguish between these actions. /// This is necessary for federation, because Activitypub doesn't distinguish between these
/// actions.
pub async fn upsert(pool: &mut DbPool<'_>, form: &PersonInsertForm) -> Result<Self, Error> { pub async fn upsert(pool: &mut DbPool<'_>, form: &PersonInsertForm) -> Result<Self, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
insert_into(person::table) insert_into(person::table)

View file

@ -43,9 +43,9 @@ fn migrations() -> diesel_migrations::FileBasedMigrations {
.expect("failed to get migration source") .expect("failed to get migration source")
} }
/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced /// This SQL code sets up the `r` schema, which contains things that can be safely dropped and
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema /// replaced instead of being changed using migrations. It may not create or modify things outside
/// (indicated by `r.` before the name), unless a comment says otherwise. /// of the `r` schema (indicated by `r.` before the name), unless a comment says otherwise.
fn replaceable_schema() -> String { fn replaceable_schema() -> String {
[ [
"CREATE SCHEMA r;", "CREATE SCHEMA r;",

View file

@ -41,7 +41,8 @@ pub struct Comment {
#[cfg(feature = "full")] #[cfg(feature = "full")]
#[cfg_attr(feature = "full", serde(with = "LtreeDef"))] #[cfg_attr(feature = "full", serde(with = "LtreeDef"))]
#[cfg_attr(feature = "full", ts(type = "string"))] #[cfg_attr(feature = "full", ts(type = "string"))]
/// The path / tree location of a comment, separated by dots, ending with the comment's id. Ex: 0.24.27 /// The path / tree location of a comment, separated by dots, ending with the comment's id. Ex:
/// 0.24.27
pub path: Ltree, pub path: Ltree,
#[cfg(not(feature = "full"))] #[cfg(not(feature = "full"))]
pub path: String, pub path: String,

View file

@ -62,7 +62,8 @@ pub struct LocalUser {
pub totp_2fa_enabled: bool, pub totp_2fa_enabled: bool,
/// Whether to allow keyboard navigation (for browsing and interacting with posts and comments). /// Whether to allow keyboard navigation (for browsing and interacting with posts and comments).
pub enable_keyboard_navigation: bool, pub enable_keyboard_navigation: bool,
/// Whether user avatars and inline images in the UI that are gifs should be allowed to play or should be paused /// Whether user avatars and inline images in the UI that are gifs should be allowed to play or
/// should be paused
pub enable_animated_images: bool, pub enable_animated_images: bool,
/// Whether to auto-collapse bot comments. /// Whether to auto-collapse bot comments.
pub collapse_bot_comments: bool, pub collapse_bot_comments: bool,

View file

@ -43,7 +43,8 @@ pub mod tagline;
/// Default value for columns like [community::Community.inbox_url] which are marked as serde(skip). /// Default value for columns like [community::Community.inbox_url] which are marked as serde(skip).
/// ///
/// This is necessary so they can be successfully deserialized from API responses, even though the /// This is necessary so they can be successfully deserialized from API responses, even though the
/// value is not sent by Lemmy. Necessary for crates which rely on Rust API such as lemmy-stats-crawler. /// value is not sent by Lemmy. Necessary for crates which rely on Rust API such as
/// lemmy-stats-crawler.
fn placeholder_apub_url() -> DbUrl { fn placeholder_apub_url() -> DbUrl {
DbUrl(Box::new( DbUrl(Box::new(
Url::parse("http://example.com").expect("parse placeholder url"), Url::parse("http://example.com").expect("parse placeholder url"),

View file

@ -76,7 +76,8 @@ pub struct SiteUpdateForm {
pub name: Option<String>, pub name: Option<String>,
pub sidebar: Option<Option<String>>, pub sidebar: Option<Option<String>>,
pub updated: Option<Option<DateTime<Utc>>>, pub updated: Option<Option<DateTime<Utc>>>,
// when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column. // when you want to null out a column, you have to send Some(None)), since sending None means you
// just don't want to update that column.
pub icon: Option<Option<DbUrl>>, pub icon: Option<Option<DbUrl>>,
pub banner: Option<Option<DbUrl>>, pub banner: Option<Option<DbUrl>>,
pub description: Option<Option<String>>, pub description: Option<Option<String>>,

View file

@ -25,8 +25,8 @@ pub type Find<T> = dsl::Find<<T as HasTable>::Table, <T as Crud>::IdType>;
pub type PrimaryKey<T> = <<T as HasTable>::Table as Table>::PrimaryKey; pub type PrimaryKey<T> = <<T as HasTable>::Table as Table>::PrimaryKey;
// Trying to create default implementations for `create` and `update` results in a lifetime mess and weird compile errors. // Trying to create default implementations for `create` and `update` results in a lifetime mess and
// https://github.com/rust-lang/rust/issues/102211 // weird compile errors. https://github.com/rust-lang/rust/issues/102211
#[async_trait] #[async_trait]
pub trait Crud: HasTable + Sized pub trait Crud: HasTable + Sized
where where
@ -49,7 +49,8 @@ where
query.first(conn).await.optional() query.first(conn).await.optional()
} }
/// when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column. /// when you want to null out a column, you have to send Some(None)), since sending None means you
/// just don't want to update that column.
async fn update( async fn update(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
id: Self::IdType, id: Self::IdType,

View file

@ -61,7 +61,8 @@ pub const RANK_DEFAULT: f64 = 0.0001;
pub type ActualDbPool = Pool<AsyncPgConnection>; pub type ActualDbPool = Pool<AsyncPgConnection>;
/// References a pool or connection. Functions must take `&mut DbPool<'_>` to allow implicit reborrowing. /// References a pool or connection. Functions must take `&mut DbPool<'_>` to allow implicit
/// reborrowing.
/// ///
/// https://github.com/rust-lang/rfcs/issues/1403 /// https://github.com/rust-lang/rfcs/issues/1403
pub enum DbPool<'a> { pub enum DbPool<'a> {
@ -101,7 +102,8 @@ impl<'a> DerefMut for DbConn<'a> {
} }
} }
// Allows functions that take `DbPool<'_>` to be called in a transaction by passing `&mut conn.into()` // Allows functions that take `DbPool<'_>` to be called in a transaction by passing `&mut
// conn.into()`
impl<'a> From<&'a mut AsyncPgConnection> for DbPool<'a> { impl<'a> From<&'a mut AsyncPgConnection> for DbPool<'a> {
fn from(value: &'a mut AsyncPgConnection) -> Self { fn from(value: &'a mut AsyncPgConnection) -> Self {
DbPool::Conn(value) DbPool::Conn(value)
@ -120,11 +122,13 @@ impl<'a> From<&'a ActualDbPool> for DbPool<'a> {
} }
} }
/// Runs multiple async functions that take `&mut DbPool<'_>` as input and return `Result`. Only works when the `futures` crate is listed in `Cargo.toml`. /// Runs multiple async functions that take `&mut DbPool<'_>` as input and return `Result`. Only
/// works when the `futures` crate is listed in `Cargo.toml`.
/// ///
/// `$pool` is the value given to each function. /// `$pool` is the value given to each function.
/// ///
/// A `Result` is returned (not in a `Future`, so don't use `.await`). The `Ok` variant contains a tuple with the values returned by the given functions. /// A `Result` is returned (not in a `Future`, so don't use `.await`). The `Ok` variant contains a
/// tuple with the values returned by the given functions.
/// ///
/// The functions run concurrently if `$pool` has the `DbPool::Pool` variant. /// The functions run concurrently if `$pool` has the `DbPool::Pool` variant.
#[macro_export] #[macro_export]
@ -337,8 +341,10 @@ fn establish_connection(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConne
} }
}); });
let mut conn = AsyncPgConnection::try_from(client).await?; let mut conn = AsyncPgConnection::try_from(client).await?;
// * Change geqo_threshold back to default value if it was changed, so it's higher than the collapse limits // * Change geqo_threshold back to default value if it was changed, so it's higher than the
// * Change collapse limits from 8 to 11 so the query planner can find a better table join order for more complicated queries // collapse limits
// * Change collapse limits from 8 to 11 so the query planner can find a better table join order
// for more complicated queries
conn conn
.batch_execute("SET geqo_threshold=12;SET from_collapse_limit=11;SET join_collapse_limit=11;") .batch_execute("SET geqo_threshold=12;SET from_collapse_limit=11;SET join_collapse_limit=11;")
.await .await
@ -415,9 +421,11 @@ pub async fn build_db_pool() -> LemmyResult<ActualDbPool> {
let pool = Pool::builder(manager) let pool = Pool::builder(manager)
.max_size(SETTINGS.database.pool_size) .max_size(SETTINGS.database.pool_size)
.runtime(Runtime::Tokio1) .runtime(Runtime::Tokio1)
// Limit connection age to prevent use of prepared statements that have query plans based on very old statistics // Limit connection age to prevent use of prepared statements that have query plans based on
// very old statistics
.pre_recycle(Hook::sync_fn(|_conn, metrics| { .pre_recycle(Hook::sync_fn(|_conn, metrics| {
// Preventing the first recycle can cause an infinite loop when trying to get a new connection from the pool // Preventing the first recycle can cause an infinite loop when trying to get a new connection
// from the pool
let conn_was_used = metrics.recycled.is_some(); let conn_was_used = metrics.recycled.is_some();
if metrics.age() > Duration::from_secs(3 * 24 * 60 * 60) && conn_was_used { if metrics.age() > Duration::from_secs(3 * 24 * 60 * 60) && conn_was_used {
Err(HookError::Continue(None)) Err(HookError::Continue(None))
@ -508,7 +516,8 @@ pub trait ListFn<'a, T, Args>: Fn(DbConn<'a>, Args) -> ResultFuture<'a, Vec<T>>
impl<'a, T, Args, F: Fn(DbConn<'a>, Args) -> ResultFuture<'a, Vec<T>>> ListFn<'a, T, Args> for F {} impl<'a, T, Args, F: Fn(DbConn<'a>, Args) -> ResultFuture<'a, Vec<T>>> ListFn<'a, T, Args> for F {}
/// Allows read and list functions to capture a shared closure that has an inferred return type, which is useful for join logic /// Allows read and list functions to capture a shared closure that has an inferred return type,
/// which is useful for join logic
pub struct Queries<RF, LF> { pub struct Queries<RF, LF> {
pub read_fn: RF, pub read_fn: RF,
pub list_fn: LF, pub list_fn: LF,

View file

@ -150,7 +150,8 @@ fn queries<'a>() -> Queries<
query = query.filter(comment_report::comment_id.eq(comment_id)); query = query.filter(comment_report::comment_id.eq(comment_id));
} }
// If viewing all reports, order by newest, but if viewing unresolved only, show the oldest first (FIFO) // If viewing all reports, order by newest, but if viewing unresolved only, show the oldest
// first (FIFO)
if options.unresolved_only { if options.unresolved_only {
query = query query = query
.filter(comment_report::resolved.eq(false)) .filter(comment_report::resolved.eq(false))

View file

@ -238,7 +238,7 @@ fn queries<'a>() -> Queries<
); );
match listing_type { match listing_type {
ListingType::Subscribed => query = query.filter(is_subscribed), // TODO could be this: and(community_follower::person_id.eq(person_id_join)), ListingType::Subscribed => query = query.filter(is_subscribed), /* TODO could be this: and(community_follower::person_id.eq(person_id_join)), */
ListingType::Local => { ListingType::Local => {
query = query query = query
.filter(community::local.eq(true)) .filter(community::local.eq(true))
@ -327,7 +327,8 @@ fn queries<'a>() -> Queries<
query = query.filter(nlevel(comment::path).le(depth_limit)); query = query.filter(nlevel(comment::path).le(depth_limit));
// only order if filtering by a post id, or parent_path. DOS potential otherwise and max_depth + !post_id isn't used anyways (afaik) // only order if filtering by a post id, or parent_path. DOS potential otherwise and max_depth
// + !post_id isn't used anyways (afaik)
if options.post_id.is_some() || options.parent_path.is_some() { if options.post_id.is_some() || options.parent_path.is_some() {
// Always order by the parent path first // Always order by the parent path first
query = query.then_order_by(subpath(comment::path, 0, -1)); query = query.then_order_by(subpath(comment::path, 0, -1));

View file

@ -178,7 +178,8 @@ fn queries<'a>() -> Queries<
query = query.filter(post::id.eq(post_id)); query = query.filter(post::id.eq(post_id));
} }
// If viewing all reports, order by newest, but if viewing unresolved only, show the oldest first (FIFO) // If viewing all reports, order by newest, but if viewing unresolved only, show the oldest
// first (FIFO)
if options.unresolved_only { if options.unresolved_only {
query = query query = query
.filter(post_report::resolved.eq(false)) .filter(post_report::resolved.eq(false))

View file

@ -617,7 +617,8 @@ impl PaginationCursor {
} }
// currently we use a postaggregates struct as the pagination token. // currently we use a postaggregates struct as the pagination token.
// we only use some of the properties of the post aggregates, depending on which sort type we page by // we only use some of the properties of the post aggregates, depending on which sort type we page
// by
#[derive(Clone)] #[derive(Clone)]
pub struct PaginationCursorData(PostAggregates); pub struct PaginationCursorData(PostAggregates);
@ -627,7 +628,8 @@ pub struct PostQuery<'a> {
pub sort: Option<SortType>, pub sort: Option<SortType>,
pub creator_id: Option<PersonId>, pub creator_id: Option<PersonId>,
pub community_id: Option<CommunityId>, pub community_id: Option<CommunityId>,
// if true, the query should be handled as if community_id was not given except adding the literal filter // if true, the query should be handled as if community_id was not given except adding the
// literal filter
pub community_id_just_for_prefetch: bool, pub community_id_just_for_prefetch: bool,
pub local_user: Option<&'a LocalUserView>, pub local_user: Option<&'a LocalUserView>,
pub search_term: Option<String>, pub search_term: Option<String>,
@ -649,15 +651,17 @@ impl<'a> PostQuery<'a> {
site: &Site, site: &Site,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<Option<PostQuery<'a>>, Error> { ) -> Result<Option<PostQuery<'a>>, Error> {
// first get one page for the most popular community to get an upper bound for the page end for the real query // first get one page for the most popular community to get an upper bound for the page end for
// the reason this is needed is that when fetching posts for a single community PostgreSQL can optimize // the real query the reason this is needed is that when fetching posts for a single
// the query to use an index on e.g. (=, >=, >=, >=) and fetch only LIMIT rows // community PostgreSQL can optimize the query to use an index on e.g. (=, >=, >=, >=) and
// but for the followed-communities query it has to query the index on (IN, >=, >=, >=) // fetch only LIMIT rows but for the followed-communities query it has to query the index on
// which it currently can't do at all (as of PG 16). see the discussion here: // (IN, >=, >=, >=) which it currently can't do at all (as of PG 16). see the discussion
// https://github.com/LemmyNet/lemmy/issues/2877#issuecomment-1673597190 // here: https://github.com/LemmyNet/lemmy/issues/2877#issuecomment-1673597190
// //
// the results are correct no matter which community we fetch these for, since it basically covers the "worst case" of the whole page consisting of posts from one community // the results are correct no matter which community we fetch these for, since it basically
// but using the largest community decreases the pagination-frame so make the real query more efficient. // covers the "worst case" of the whole page consisting of posts from one community
// but using the largest community decreases the pagination-frame so make the real query more
// efficient.
use lemmy_db_schema::schema::{ use lemmy_db_schema::schema::{
community_aggregates::dsl::{community_aggregates, community_id, users_active_month}, community_aggregates::dsl::{community_aggregates, community_id, users_active_month},
community_follower::dsl::{ community_follower::dsl::{
@ -708,7 +712,8 @@ impl<'a> PostQuery<'a> {
) )
.await?; .await?;
// take last element of array. if this query returned less than LIMIT elements, // take last element of array. if this query returned less than LIMIT elements,
// the heuristic is invalid since we can't guarantee the full query will return >= LIMIT results (return original query) // the heuristic is invalid since we can't guarantee the full query will return >= LIMIT results
// (return original query)
if (v.len() as i64) < limit { if (v.len() as i64) < limit {
Ok(Some(self.clone())) Ok(Some(self.clone()))
} else { } else {
@ -1429,7 +1434,8 @@ mod tests {
let mut inserted_post_ids = vec![]; let mut inserted_post_ids = vec![];
let mut inserted_comment_ids = vec![]; let mut inserted_comment_ids = vec![];
// Create 150 posts with varying non-correlating values for publish date, number of comments, and featured // Create 150 posts with varying non-correlating values for publish date, number of comments,
// and featured
for comments in 0..10 { for comments in 0..10 {
for _ in 0..15 { for _ in 0..15 {
let post_form = PostInsertForm::builder() let post_form = PostInsertForm::builder()

View file

@ -49,7 +49,8 @@ fn queries<'a>() -> Queries<
let list = move |mut conn: DbConn<'a>, options: PrivateMessageReportQuery| async move { let list = move |mut conn: DbConn<'a>, options: PrivateMessageReportQuery| async move {
let mut query = all_joins(private_message_report::table.into_boxed()); let mut query = all_joins(private_message_report::table.into_boxed());
// If viewing all reports, order by newest, but if viewing unresolved only, show the oldest first (FIFO) // If viewing all reports, order by newest, but if viewing unresolved only, show the oldest
// first (FIFO)
if options.unresolved_only { if options.unresolved_only {
query = query query = query
.filter(private_message_report::resolved.eq(false)) .filter(private_message_report::resolved.eq(false))

View file

@ -49,7 +49,8 @@ fn queries<'a>() -> Queries<
let list = move |mut conn: DbConn<'a>, options: RegistrationApplicationQuery| async move { let list = move |mut conn: DbConn<'a>, options: RegistrationApplicationQuery| async move {
let mut query = all_joins(registration_application::table.into_boxed()); let mut query = all_joins(registration_application::table.into_boxed());
// If viewing all applications, order by newest, but if viewing unresolved only, show the oldest first (FIFO) // If viewing all applications, order by newest, but if viewing unresolved only, show the oldest
// first (FIFO)
if options.unread_only { if options.unread_only {
query = query query = query
.filter(registration_application::admin_id.is_null()) .filter(registration_application::admin_id.is_null())

View file

@ -112,9 +112,10 @@ pub struct PostReportView {
pub resolver: Option<Person>, pub resolver: Option<Person>,
} }
/// currently this is just a wrapper around post id, but should be seen as opaque from the client's perspective /// currently this is just a wrapper around post id, but should be seen as opaque from the client's
/// stringified since we might want to use arbitrary info later, with a P prepended to prevent ossification /// perspective stringified since we might want to use arbitrary info later, with a P prepended to
/// (api users love to make assumptions (e.g. parse stuff that looks like numbers as numbers) about apis that aren't part of the spec /// prevent ossification (api users love to make assumptions (e.g. parse stuff that looks like
/// numbers as numbers) about apis that aren't part of the spec
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "full", derive(ts_rs::TS))] #[cfg_attr(feature = "full", derive(ts_rs::TS))]
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]

View file

@ -14,7 +14,8 @@ use lemmy_db_schema::{
}; };
impl CommunityFollowerView { impl CommunityFollowerView {
/// return a list of local community ids and remote inboxes that at least one user of the given instance has followed /// return a list of local community ids and remote inboxes that at least one user of the given
/// instance has followed
pub async fn get_instance_followed_community_inboxes( pub async fn get_instance_followed_community_inboxes(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
instance_id: InstanceId, instance_id: InstanceId,
@ -22,8 +23,9 @@ impl CommunityFollowerView {
) -> Result<Vec<(CommunityId, DbUrl)>, Error> { ) -> Result<Vec<(CommunityId, DbUrl)>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
// In most cases this will fetch the same url many times (the shared inbox url) // In most cases this will fetch the same url many times (the shared inbox url)
// PG will only send a single copy to rust, but it has to scan through all follower rows (same as it was before). // PG will only send a single copy to rust, but it has to scan through all follower rows (same
// So on the PG side it would be possible to optimize this further by adding e.g. a new table community_followed_instances (community_id, instance_id) // as it was before). So on the PG side it would be possible to optimize this further by
// adding e.g. a new table community_followed_instances (community_id, instance_id)
// that would work for all instances that support fully shared inboxes. // that would work for all instances that support fully shared inboxes.
// It would be a bit more complicated though to keep it in sync. // It would be a bit more complicated though to keep it in sync.
@ -31,7 +33,8 @@ impl CommunityFollowerView {
.inner_join(community::table) .inner_join(community::table)
.inner_join(person::table) .inner_join(person::table)
.filter(person::instance_id.eq(instance_id)) .filter(person::instance_id.eq(instance_id))
.filter(community::local) // this should be a no-op since community_followers table only has local-person+remote-community or remote-person+local-community .filter(community::local) // this should be a no-op since community_followers table only has
// local-person+remote-community or remote-person+local-community
.filter(not(person::local)) .filter(not(person::local))
.filter(community_follower::published.gt(published_since.naive_utc())) .filter(community_follower::published.gt(published_since.naive_utc()))
.select(( .select((

View file

@ -154,7 +154,7 @@ fn queries<'a>() -> Queries<
if let Some(listing_type) = options.listing_type { if let Some(listing_type) = options.listing_type {
query = match listing_type { query = match listing_type {
ListingType::Subscribed => query.filter(community_follower::pending.is_not_null()), // TODO could be this: and(community_follower::person_id.eq(person_id_join)), ListingType::Subscribed => query.filter(community_follower::pending.is_not_null()), /* TODO could be this: and(community_follower::person_id.eq(person_id_join)), */
ListingType::Local => query.filter(community::local.eq(true)), ListingType::Local => query.filter(community::local.eq(true)),
_ => query, _ => query,
}; };

View file

@ -119,7 +119,8 @@ async fn start_stop_federation_workers(
workers.len(), workers.len(),
WORKER_EXIT_TIMEOUT WORKER_EXIT_TIMEOUT
); );
// the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered
// concurrently
futures::future::join_all(workers.into_values().map(util::CancellableTask::cancel)).await; futures::future::join_all(workers.into_values().map(util::CancellableTask::cancel)).await;
exit_print.await?; exit_print.await?;
Ok(()) Ok(())
@ -140,7 +141,8 @@ pub fn start_stop_federation_workers_cancellable(
}) })
} }
/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) /// every 60s, print the state for every instance. exits if the receiver is done (all senders
/// dropped)
async fn receive_print_stats( async fn receive_print_stats(
pool: ActualDbPool, pool: ActualDbPool,
mut receiver: UnboundedReceiver<(String, FederationQueueState)>, mut receiver: UnboundedReceiver<(String, FederationQueueState)>,
@ -171,7 +173,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
tracing::error!("could not get last id"); tracing::error!("could not get last id");
return; return;
}; };
// it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be
// considered up to date
tracing::info!( tracing::info!(
"Federation state as of {}:", "Federation state as of {}:",
Local::now() Local::now()

View file

@ -26,7 +26,8 @@ use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue. /// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the
/// federation queue.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| { pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION") std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty()) .map(|s| !s.is_empty())
@ -35,9 +36,10 @@ pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
/// Recheck for new federation work every n seconds. /// Recheck for new federation work every n seconds.
/// ///
/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, /// When the queue is processed faster than new activities are added and it reaches the current time
/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. /// with an empty batch, this is the delay the queue waits before it checks if new activities have
/// This delay is only applied if no federated activity happens during sending activities of the last batch. /// been added to the sent_activities table. This delay is only applied if no federated activity
/// happens during sending activities of the last batch.
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| { pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION { if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_millis(100) Duration::from_millis(100)
@ -47,7 +49,8 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
}); });
/// A task that will be run in an infinite loop, unless it is cancelled. /// A task that will be run in an infinite loop, unless it is cancelled.
/// If the task exits without being cancelled, an error will be logged and the task will be restarted. /// If the task exits without being cancelled, an error will be logged and the task will be
/// restarted.
pub struct CancellableTask { pub struct CancellableTask {
f: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>, f: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>,
} }

View file

@ -36,17 +36,22 @@ use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn}; use tracing::{debug, info, trace, warn};
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// Check whether to save state to db every n sends if there's no failures (during failures state is
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. /// saved after every attempt) This determines the batch size for loop_batch. After a batch ends and
/// SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) /// Save state to db after this time has passed since the last state (so if the server crashes or is
/// SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// interval with which new additions to community_followers are queried. /// interval with which new additions to community_followers are queried.
/// ///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), /// The first time some user on an instance follows a specific remote community (or, more precisely:
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. /// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. /// the maximum time until the follow actually results in activities from that community id being
/// (see https://github.com/LemmyNet/lemmy/issues/3958) /// sent to that inbox url. This delay currently needs to not be too small because the DB load is
/// currently fairly high because of the current structure of storing inboxes for every person, not
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| { static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION { if *LEMMY_TEST_FAST_FEDERATION {
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
@ -54,8 +59,9 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
} }
}); });
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. /// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. /// unfollows a specific remote community. This is expected to happen pretty rarely and updating it
/// in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
@ -80,7 +86,8 @@ impl InstanceWorker {
pub(crate) async fn init_and_loop( pub(crate) async fn init_and_loop(
instance: Instance, instance: Instance,
context: Data<LemmyContext>, context: Data<LemmyContext>,
pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes pool: &mut DbPool<'_>, /* in theory there's a ref to the pool in context, but i couldn't get
* that to work wrt lifetimes */
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<(String, FederationQueueState)>, stats_sender: UnboundedSender<(String, FederationQueueState)>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
@ -101,7 +108,8 @@ impl InstanceWorker {
worker.loop_until_stopped(pool).await worker.loop_until_stopped(pool).await
} }
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit)
pub(crate) async fn loop_until_stopped( pub(crate) async fn loop_until_stopped(
&mut self, &mut self,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
@ -152,8 +160,8 @@ impl InstanceWorker {
let mut id = if let Some(id) = self.state.last_successful_id { let mut id = if let Some(id) = self.state.last_successful_id {
id id
} else { } else {
// this is the initial creation (instance first seen) of the federation queue for this instance // this is the initial creation (instance first seen) of the federation queue for this
// skip all past activities: // instance skip all past activities:
self.state.last_successful_id = Some(latest_id); self.state.last_successful_id = Some(latest_id);
// save here to ensure it's not read as 0 again later if no activities have happened // save here to ensure it's not read as 0 again later if no activities have happened
self.save_and_send_state(pool).await?; self.save_and_send_state(pool).await?;
@ -281,7 +289,8 @@ impl InstanceWorker {
self.site_loaded = true; self.site_loaded = true;
} }
if let Some(site) = &self.site { if let Some(site) = &self.site {
// Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these
// activities. So handling it like this is fine.
inbox_urls.insert(site.inbox_url.inner().clone()); inbox_urls.insert(site.inbox_url.inner().clone());
} }
} }
@ -324,7 +333,8 @@ impl InstanceWorker {
Ok(()) Ok(())
} }
/// get a list of local communities with the remote inboxes on the given instance that cares about them /// get a list of local communities with the remote inboxes on the given instance that cares about
/// them
async fn get_communities( async fn get_communities(
&mut self, &mut self,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
@ -332,7 +342,8 @@ impl InstanceWorker {
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> { ) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
let new_last_fetch = let new_last_fetch =
Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if
// published date is not exact
Ok(( Ok((
CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch)
.await? .await?

View file

@ -166,7 +166,8 @@ pub enum LemmyErrorType {
CouldntSendWebmention, CouldntSendWebmention,
ContradictingFilters, ContradictingFilters,
InstanceBlockAlreadyExists, InstanceBlockAlreadyExists,
/// Thrown when an API call is submitted with more than 1000 array elements, see [[MAX_API_PARAM_ELEMENTS]] /// Thrown when an API call is submitted with more than 1000 array elements, see
/// [[MAX_API_PARAM_ELEMENTS]]
TooManyItems, TooManyItems,
CommunityHasNoFollowers, CommunityHasNoFollowers,
BanExpirationInPast, BanExpirationInPast,

View file

@ -56,6 +56,7 @@ pub fn spawn_try_task(
tracing::warn!("error in spawn: {e}"); tracing::warn!("error in spawn: {e}");
} }
} }
.in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called .in_current_span(), /* this makes sure the inner tracing gets the same context as where
* spawn was called */
); );
} }

View file

@ -46,9 +46,10 @@ impl Bucket {
fn update(self, now: InstantSecs, config: BucketConfig) -> Self { fn update(self, now: InstantSecs, config: BucketConfig) -> Self {
let secs_since_last_checked = now.secs.saturating_sub(self.last_checked.secs); let secs_since_last_checked = now.secs.saturating_sub(self.last_checked.secs);
// For `secs_since_last_checked` seconds, the amount of tokens increases by `capacity` every `secs_to_refill` seconds. // For `secs_since_last_checked` seconds, the amount of tokens increases by `capacity` every
// The amount of tokens added per second is `capacity / secs_to_refill`. // `secs_to_refill` seconds. The amount of tokens added per second is `capacity /
// The expression below is like `secs_since_last_checked * (capacity / secs_to_refill)` but with precision and non-overflowing multiplication. // secs_to_refill`. The expression below is like `secs_since_last_checked * (capacity /
// secs_to_refill)` but with precision and non-overflowing multiplication.
let added_tokens = u64::from(secs_since_last_checked) * u64::from(config.capacity) let added_tokens = u64::from(secs_since_last_checked) * u64::from(config.capacity)
/ u64::from(config.secs_to_refill); / u64::from(config.secs_to_refill);
@ -124,8 +125,9 @@ impl<K: Eq + Hash, C: MapLevel> MapLevel for Map<K, C> {
..config ..config
}); });
// Remove groups that are no longer needed if the hash map's existing allocation has no space for new groups. // Remove groups that are no longer needed if the hash map's existing allocation has no space
// This is done before calling `HashMap::entry` because that immediately allocates just like `HashMap::insert`. // for new groups. This is done before calling `HashMap::entry` because that immediately
// allocates just like `HashMap::insert`.
if (self.capacity() == self.len()) && !self.contains_key(&addr_part) { if (self.capacity() == self.len()) && !self.contains_key(&addr_part) {
self.remove_full_buckets(now, configs); self.remove_full_buckets(now, configs);
} }
@ -219,7 +221,8 @@ impl<C: Default> RateLimitedGroup<C> {
if new_bucket.tokens == 0 { if new_bucket.tokens == 0 {
// Not enough tokens yet // Not enough tokens yet
// Setting `bucket` to `new_bucket` here is useless and would cause the bucket to start over at 0 tokens because of rounding // Setting `bucket` to `new_bucket` here is useless and would cause the bucket to start over
// at 0 tokens because of rounding
false false
} else { } else {
// Consume 1 token // Consume 1 token
@ -239,10 +242,12 @@ pub struct RateLimitState {
/// ///
/// The same thing happens for the first 48 and 56 bits, but with increased capacity. /// The same thing happens for the first 48 and 56 bits, but with increased capacity.
/// ///
/// This is done because all users can easily switch to any other IPv6 address that has the same first 64 bits. /// This is done because all users can easily switch to any other IPv6 address that has the same
/// It could be as low as 48 bits for some networks, which is the reason for 48 and 56 bit address groups. /// first 64 bits. It could be as low as 48 bits for some networks, which is the reason for 48
/// and 56 bit address groups.
ipv6_buckets: Map<[u8; 6], Map<u8, Map<u8, ()>>>, ipv6_buckets: Map<[u8; 6], Map<u8, Map<u8, ()>>>,
/// This stores a `BucketConfig` for each `ActionType`. `EnumMap` makes it impossible to have a missing `BucketConfig`. /// This stores a `BucketConfig` for each `ActionType`. `EnumMap` makes it impossible to have a
/// missing `BucketConfig`.
bucket_configs: EnumMap<ActionType, BucketConfig>, bucket_configs: EnumMap<ActionType, BucketConfig>,
} }
@ -339,7 +344,8 @@ mod tests {
let mut rate_limiter = RateLimitState::new(bucket_configs); let mut rate_limiter = RateLimitState::new(bucket_configs);
let mut now = InstantSecs::now(); let mut now = InstantSecs::now();
// Do 1 `Message` and 1 `Post` action for each IP address, and expect the limit to not be reached // Do 1 `Message` and 1 `Post` action for each IP address, and expect the limit to not be
// reached
let ips = [ let ips = [
"123.123.123.123", "123.123.123.123",
"1:2:3::", "1:2:3::",

View file

@ -38,7 +38,8 @@ pub struct Settings {
/// Whether the site is available over TLS. Needs to be true for federation to work. /// Whether the site is available over TLS. Needs to be true for federation to work.
#[default(true)] #[default(true)]
pub tls_enabled: bool, pub tls_enabled: bool,
/// Set the URL for opentelemetry exports. If you do not have an opentelemetry collector, do not set this option /// Set the URL for opentelemetry exports. If you do not have an opentelemetry collector, do not
/// set this option
#[default(None)] #[default(None)]
#[doku(skip)] #[doku(skip)]
pub opentelemetry_url: Option<Url>, pub opentelemetry_url: Option<Url>,
@ -85,7 +86,8 @@ pub struct PictrsConfig {
/// To be removed in 0.20 /// To be removed in 0.20
pub(super) cache_external_link_previews: Option<bool>, pub(super) cache_external_link_previews: Option<bool>,
/// Specifies how to handle remote images, so that users don't have to connect directly to remote servers. /// Specifies how to handle remote images, so that users don't have to connect directly to remote
/// servers.
#[default(PictrsImageMode::StoreLinkPreviews)] #[default(PictrsImageMode::StoreLinkPreviews)]
pub(super) image_mode: PictrsImageMode, pub(super) image_mode: PictrsImageMode,
@ -107,10 +109,11 @@ pub enum PictrsImageMode {
/// This is the default behaviour, and also matches Lemmy 0.18. /// This is the default behaviour, and also matches Lemmy 0.18.
#[default] #[default]
StoreLinkPreviews, StoreLinkPreviews,
/// If enabled, all images from remote domains are rewritten to pass through `/api/v3/image_proxy`, /// If enabled, all images from remote domains are rewritten to pass through
/// including embedded images in markdown. Images are stored temporarily in pict-rs for caching. /// `/api/v3/image_proxy`, including embedded images in markdown. Images are stored temporarily
/// This improves privacy as users don't expose their IP to untrusted servers, and decreases load /// in pict-rs for caching. This improves privacy as users don't expose their IP to untrusted
/// on other servers. However it increases bandwidth use for the local server. /// servers, and decreases load on other servers. However it increases bandwidth use for the
/// local server.
/// ///
/// Requires pict-rs 0.5 /// Requires pict-rs 0.5
ProxyAllImages, ProxyAllImages,

View file

@ -101,7 +101,7 @@ mod test {
// These helped with testing // These helped with testing
// #[test] // #[test]
// fn test_send_email() { // fn test_send_email() {
// let result = send_email("not a subject", "test_email@gmail.com", "ur user", "<h1>HI there</h1>"); // let result = send_email("not a subject", "test_email@gmail.com", "ur user", "<h1>HI
// assert!(result.is_ok()); // there</h1>"); assert!(result.is_ok());
// } // }
} }

View file

@ -272,7 +272,8 @@ pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
.service( .service(
// Handle /user/login separately to add the register() rate limiter // Handle /user/login separately to add the register() rate limiter
// TODO: pretty annoying way to apply rate limits for register and login, we should // TODO: pretty annoying way to apply rate limits for register and login, we should
// group them under a common path so that rate limit is only applied once (eg under /account). // group them under a common path so that rate limit is only applied once (eg under
// /account).
web::resource("/user/login") web::resource("/user/login")
.guard(guard::Post()) .guard(guard::Post())
.wrap(rate_limit.register()) .wrap(rate_limit.register())

View file

@ -75,13 +75,14 @@ use url::Url;
pub struct CmdArgs { pub struct CmdArgs {
/// Don't run scheduled tasks. /// Don't run scheduled tasks.
/// ///
/// If you are running multiple Lemmy server processes, you probably want to disable scheduled tasks on /// If you are running multiple Lemmy server processes, you probably want to disable scheduled
/// all but one of the processes, to avoid running the tasks more often than intended. /// tasks on all but one of the processes, to avoid running the tasks more often than intended.
#[arg(long, default_value_t = false, env = "LEMMY_DISABLE_SCHEDULED_TASKS")] #[arg(long, default_value_t = false, env = "LEMMY_DISABLE_SCHEDULED_TASKS")]
disable_scheduled_tasks: bool, disable_scheduled_tasks: bool,
/// Disables the HTTP server. /// Disables the HTTP server.
/// ///
/// This can be used to run a Lemmy server process that only performs scheduled tasks or activity sending. /// This can be used to run a Lemmy server process that only performs scheduled tasks or activity
/// sending.
#[arg(long, default_value_t = false, env = "LEMMY_DISABLE_HTTP_SERVER")] #[arg(long, default_value_t = false, env = "LEMMY_DISABLE_HTTP_SERVER")]
disable_http_server: bool, disable_http_server: bool,
/// Disable sending outgoing ActivityPub messages. /// Disable sending outgoing ActivityPub messages.
@ -92,10 +93,11 @@ pub struct CmdArgs {
disable_activity_sending: bool, disable_activity_sending: bool,
/// The index of this outgoing federation process. /// The index of this outgoing federation process.
/// ///
/// Defaults to 1/1. If you want to split the federation workload onto n servers, run each server 1≤i≤n with these args: /// Defaults to 1/1. If you want to split the federation workload onto n servers, run each server
/// --federate-process-index i --federate-process-count n /// 1≤i≤n with these args: --federate-process-index i --federate-process-count n
/// ///
/// Make you have exactly one server with each `i` running, otherwise federation will randomly send duplicates or nothing. /// Make you have exactly one server with each `i` running, otherwise federation will randomly
/// send duplicates or nothing.
/// ///
/// See https://join-lemmy.org/docs/administration/horizontal_scaling.html for more detail. /// See https://join-lemmy.org/docs/administration/horizontal_scaling.html for more detail.
#[arg(long, default_value_t = 1, env = "LEMMY_FEDERATE_PROCESS_INDEX")] #[arg(long, default_value_t = 1, env = "LEMMY_FEDERATE_PROCESS_INDEX")]
@ -335,7 +337,9 @@ fn create_http_server(
let cors_config = cors_config(&settings); let cors_config = cors_config(&settings);
let app = App::new() let app = App::new()
.wrap(middleware::Logger::new( .wrap(middleware::Logger::new(
// This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy // This is the default log format save for the usage of %{r}a over %a to guarantee to
// record the client's (forwarded) IP and not the last peer address, since the latter is
// frequently just a reverse proxy
"%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T", "%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T",
)) ))
.wrap(middleware::Compress::default()) .wrap(middleware::Compress::default())
@ -377,7 +381,8 @@ fn cors_config(settings: &Settings) -> Cors {
let cors_origin_setting = settings.cors_origin(); let cors_origin_setting = settings.cors_origin();
match (cors_origin_setting.clone(), cfg!(debug_assertions)) { match (cors_origin_setting.clone(), cfg!(debug_assertions)) {
(Some(origin), false) => { (Some(origin), false) => {
// Need to call send_wildcard() explicitly, passing this into allowed_origin() results in error // Need to call send_wildcard() explicitly, passing this into allowed_origin() results in
// error
if cors_origin_setting.as_deref() == Some("*") { if cors_origin_setting.as_deref() == Some("*") {
Cors::default().allow_any_origin().send_wildcard() Cors::default().allow_any_origin().send_wildcard()
} else { } else {

View file

@ -467,10 +467,11 @@ async fn update_instance_software(
for instance in instances { for instance in instances {
let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain); let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
// The `updated` column is used to check if instances are alive. If it is more than three days // The `updated` column is used to check if instances are alive. If it is more than three
// in the past, no outgoing activities will be sent to that instance. However not every // days in the past, no outgoing activities will be sent to that instance. However
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's // not every Fediverse instance has a valid Nodeinfo endpoint (its not required for
// why we always need to mark instances as updated if they are alive. // Activitypub). That's why we always need to mark instances as updated if they are
// alive.
let default_form = InstanceForm::builder() let default_form = InstanceForm::builder()
.domain(instance.domain.clone()) .domain(instance.domain.clone())
.updated(Some(naive_now())) .updated(Some(naive_now()))