From 4fe1dd9af49aff98979773d7678e397c4235049f Mon Sep 17 00:00:00 2001 From: dull b Date: Fri, 15 Dec 2023 20:25:38 +0000 Subject: [PATCH 01/18] stuff --- crates/db_schema/src/impls/actor_language.rs | 18 ++++++++++-------- crates/db_schema/src/utils.rs | 8 ++++++++ crates/db_views/src/post_view.rs | 2 +- scripts/start_dev_db.sh | 14 +++++++++----- scripts/test.sh | 1 - 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/crates/db_schema/src/impls/actor_language.rs b/crates/db_schema/src/impls/actor_language.rs index 5c4e252d4..2e80b0680 100644 --- a/crates/db_schema/src/impls/actor_language.rs +++ b/crates/db_schema/src/impls/actor_language.rs @@ -96,16 +96,18 @@ impl LocalUserLanguage { .execute(conn) .await?; - for l in lang_ids { - let form = LocalUserLanguageForm { + let forms = lang_ids + .into_iter() + .map(|l| LocalUserLanguageForm { local_user_id: for_local_user_id, language_id: l, - }; - insert_into(local_user_language) - .values(form) - .get_result::(conn) - .await?; - } + }) + .collect::>(); + + insert_into(local_user_language) + .values(forms) + .execute(conn) + .await?; Ok(()) }) as _ }) diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 2b1179bee..b181edd7f 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -15,9 +15,11 @@ use diesel::{ pg::Pg, result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError}, serialize::{Output, ToSql}, + sql_query, sql_types::{Text, Timestamptz}, IntoSql, PgConnection, + RunQueryDsl, }; use diesel_async::{ pg::AsyncPgConnection, @@ -280,6 +282,12 @@ fn run_migrations(db_url: &str) { // Needs to be a sync connection let mut conn = PgConnection::establish(db_url).unwrap_or_else(|e| panic!("Error connecting to {db_url}: {e}")); + + // Disable auto_explain output for migrations + sql_query("SET auto_explain.log_min_duration = -1") + .execute(&mut conn) + .expect("failed to disable auto_explain"); + info!("Running Database migrations (This may take a long time)..."); let _ = &mut conn .run_pending_migrations(MIGRATIONS) diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index 1d15c7c41..36dc0ee89 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -1433,7 +1433,7 @@ mod tests { let inserted_community = Community::create(pool, &community_form).await.unwrap(); let mut inserted_post_ids = vec![]; - let mut inserted_comment_ids = vec![]; + let mut comment_forms = vec![]; // Create 150 posts with varying non-correlating values for publish date, number of comments, and featured for comments in 0..10 { diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index f192defa6..c16969ed7 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -5,11 +5,15 @@ export PGHOST=$PWD export LEMMY_DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD" # If cluster exists, stop the server and delete the cluster -if [ -d $PGDATA ] +if [[ -d $PGDATA ]] then - # Prevent `stop` from failing if server already stopped - pg_ctl restart > /dev/null - pg_ctl stop + # Only stop server if it is running + (pg_ctl status > /dev/null) || pg_status_exit_code=$? + if [[ ${pg_status_exit_code} -ne 3 ]] + then + pg_ctl stop + fi + rm -rf $PGDATA fi @@ -17,7 +21,7 @@ fi initdb --username=postgres --auth=trust --no-instructions # Start server that only listens to socket in current directory -pg_ctl start --options="-c listen_addresses= -c unix_socket_directories=$PWD" > /dev/null +pg_ctl start --options="-c listen_addresses= -c unix_socket_directories=$PWD -c logging_collector=on -c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 -c auto_explain.log_parameter_max_length=0 -c auto_explain.log_analyze=on -c enable_seqscan=off" > /dev/null # Setup database psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres diff --git a/scripts/test.sh b/scripts/test.sh index cdfbf7611..7a8bd1c29 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -28,4 +28,3 @@ cargo test -p lemmy_utils --all-features --no-fail-fast # Add this to do printlns: -- --nocapture pg_ctl stop -rm -rf $PGDATA From fdebc857539e8a73a391ba03d588b0bdcd2d676e Mon Sep 17 00:00:00 2001 From: dull b Date: Sat, 16 Dec 2023 05:15:08 +0000 Subject: [PATCH 02/18] stuff including batch_upsert function --- crates/db_schema/src/impls/comment.rs | 26 +++++++++++++++++++++----- crates/db_schema/src/utils.rs | 15 ++++++++++++++- scripts/start_dev_db.sh | 17 +++++++++++++++-- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/crates/db_schema/src/impls/comment.rs b/crates/db_schema/src/impls/comment.rs index aef399c59..f59afb6aa 100644 --- a/crates/db_schema/src/impls/comment.rs +++ b/crates/db_schema/src/impls/comment.rs @@ -57,21 +57,37 @@ impl Comment { comment_form: &CommentInsertForm, parent_path: Option<&Ltree>, ) -> Result { + Comment::create_batch(pool, &[(comment_form, parent_path)]).await?.into_iter().next().ok_or(Error::NotFound) + } + + pub async fn create_batch( + pool: &mut DbPool<'_>, + items: &[(&CommentInsertForm, Option<&Ltree>)], + ) -> Result, Error> { let conn = &mut get_conn(pool).await?; conn .build_transaction() .run(|conn| { Box::pin(async move { - // Insert, to get the id - let inserted_comment = insert_into(comment) - .values(comment_form) - .on_conflict(ap_id) + let forms = items + .iter() + .map(|&(form, _)| form) + .collect::>(); + + // Insert, to get the ids + let inserted_comments = insert_into(comment) + .values(&forms) + /*.on_conflict(ap_id) .do_update() - .set(comment_form) + .set()*/ .get_result::(conn) .await?; + // `ap_id` unique constraint violation is handled individually for each row + // because batch upsert requires having the same `set` argument for all rows + + let comment_id = inserted_comment.id; // You need to update the ltree column diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index b181edd7f..7adf009a4 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -19,7 +19,7 @@ use diesel::{ sql_types::{Text, Timestamptz}, IntoSql, PgConnection, - RunQueryDsl, + RunQueryDsl, Insertable, Table, Column, AsChangeset, Expression, SelectableExpression, expression::NonAggregate, query_builder::QueryFragment, }; use diesel_async::{ pg::AsyncPgConnection, @@ -153,6 +153,19 @@ macro_rules! try_join_with_pool { }}; } +pub async fn batch_upsert(conn: &mut AsyncPgConnection, target: T, records: U, conflict_target: Target) -> Result, DieselError> +where + T: Table, + T::AllColumns: Expression + SelectableExpression + NonAggregate + QueryFragment, + U: IntoIterator + Clone, + Vec: Insertable, + U::Item: Insertable + AsChangeset, + Target: Column, +{ + let result = diesel::insert_into(target).values(records.clone().into_iter().collect::>()).load::(conn).await; + +} + pub fn fuzzy_search(q: &str) -> String { let replaced = q.replace('%', "\\%").replace('_', "\\_").replace(' ', "%"); format!("%{replaced}%") diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index c16969ed7..d25b15e75 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -18,10 +18,23 @@ then fi # Create cluster -initdb --username=postgres --auth=trust --no-instructions +initdb --username=postgres --auth=trust --no-instructions \ + # Only listen to socket in current directory + -c listen_addresses= -c unix_socket_directories=$PWD \ + # Write logs to a file in $PGDATA/log + -c logging_collector=on \ + # Log all query plans by default + -c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 + # Include actual row amounts and run times for query plan nodes + -c auto_explain.log_analyze=on + # Avoid sequential scans so query plans show what index scans can be done + # (index scan is normally avoided in some cases, such as the table being small enough) + -c enable_seqscan=off + # Don't log parameter values + -c auto_explain.log_parameter_max_length=0 # Start server that only listens to socket in current directory -pg_ctl start --options="-c listen_addresses= -c unix_socket_directories=$PWD -c logging_collector=on -c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 -c auto_explain.log_parameter_max_length=0 -c auto_explain.log_analyze=on -c enable_seqscan=off" > /dev/null +pg_ctl start # Setup database psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres From 9b9314a1d6f8bd28064d1641f4fbc129d59bcdb9 Mon Sep 17 00:00:00 2001 From: dull b Date: Sat, 16 Dec 2023 20:52:19 +0000 Subject: [PATCH 03/18] stuff --- crates/apub/src/objects/comment.rs | 1 + crates/db_schema/src/impls/actor_language.rs | 18 ++-- crates/db_schema/src/impls/comment.rs | 93 ++++++++++++-------- crates/db_schema/src/source/comment.rs | 4 + crates/db_schema/src/utils.rs | 82 ++++++++++++++--- crates/db_views/src/post_view.rs | 20 ++++- scripts/start_dev_db.sh | 22 +++-- 7 files changed, 174 insertions(+), 66 deletions(-) diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index ab98fd7b1..0a791d842 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -172,6 +172,7 @@ impl Object for ApubComment { deleted: Some(false), ap_id: Some(note.id.into()), distinguished: note.distinguished, + path: None, local: Some(false), language_id, }; diff --git a/crates/db_schema/src/impls/actor_language.rs b/crates/db_schema/src/impls/actor_language.rs index 2e80b0680..36ac70b0b 100644 --- a/crates/db_schema/src/impls/actor_language.rs +++ b/crates/db_schema/src/impls/actor_language.rs @@ -166,16 +166,18 @@ impl SiteLanguage { .execute(conn) .await?; - for l in lang_ids { - let form = SiteLanguageForm { + let forms = lang_ids + .into_iter() + .map(|l| SiteLanguageForm { site_id: for_site_id, language_id: l, - }; - insert_into(site_language) - .values(form) - .get_result::(conn) - .await?; - } + }) + .collect::>(); + + insert_into(site_language) + .values(forms) + .get_result::(conn) + .await?; CommunityLanguage::limit_languages(conn, instance_id).await?; diff --git a/crates/db_schema/src/impls/comment.rs b/crates/db_schema/src/impls/comment.rs index f59afb6aa..a2e95fa1c 100644 --- a/crates/db_schema/src/impls/comment.rs +++ b/crates/db_schema/src/impls/comment.rs @@ -1,6 +1,16 @@ use crate::{ newtypes::{CommentId, DbUrl, PersonId}, - schema::comment::dsl::{ap_id, comment, content, creator_id, deleted, path, removed, updated}, + schema::comment::dsl::{ + ap_id, + comment, + content, + creator_id, + deleted, + id, + path, + removed, + updated, + }, source::comment::{ Comment, CommentInsertForm, @@ -11,16 +21,24 @@ use crate::{ CommentUpdateForm, }, traits::{Crud, Likeable, Saveable}, - utils::{get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, + utils::{functions::AsText, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, }; use diesel::{ dsl::{insert_into, sql_query}, result::Error, ExpressionMethods, QueryDsl, + TextExpressionMethods, }; use diesel_async::RunQueryDsl; -use diesel_ltree::Ltree; +use diesel_ltree::{ + functions::{ltree2text, text2ltree}, + Ltree, +}; +use futures_util::{ + future::TryFutureExt, + stream::{self, StreamExt, TryStreamExt}, +}; use url::Url; impl Comment { @@ -57,12 +75,16 @@ impl Comment { comment_form: &CommentInsertForm, parent_path: Option<&Ltree>, ) -> Result { - Comment::create_batch(pool, &[(comment_form, parent_path)]).await?.into_iter().next().ok_or(Error::NotFound) + Comment::create_batch(pool, &[(comment_form.clone(), parent_path.cloned())]) + .await? + .into_iter() + .next() + .ok_or(Error::NotFound) } - + pub async fn create_batch( pool: &mut DbPool<'_>, - items: &[(&CommentInsertForm, Option<&Ltree>)], + items: &[(CommentInsertForm, Option)], ) -> Result, Error> { let conn = &mut get_conn(pool).await?; @@ -72,43 +94,44 @@ impl Comment { Box::pin(async move { let forms = items .iter() - .map(|&(form, _)| form) - .collect::>(); + .map(|(comment_form, parent_path)| CommentInsertForm { + path: Some(parent_path.clone().unwrap_or(Ltree("0".to_owned()))), + ..comment_form.clone() + }); // Insert, to get the ids let inserted_comments = insert_into(comment) - .values(&forms) - /*.on_conflict(ap_id) - .do_update() - .set()*/ - .get_result::(conn) + .values(forms.clone().collect::>()) + .load::(conn) + .or_else(|_| { + // `ap_id` unique constraint violation is handled individually for each row + // because batched upsert requires having the same `set` argument for all rows + stream::iter(forms) + .then(|form| { + insert_into(comment) + .values(form.clone()) + .on_conflict(ap_id) + .do_update() + .set(form) + .get_result::(conn) + }) + .try_collect::>() + }) .await?; - // `ap_id` unique constraint violation is handled individually for each row - // because batch upsert requires having the same `set` argument for all rows - - - let comment_id = inserted_comment.id; - - // You need to update the ltree column - let ltree = Ltree(if let Some(parent_path) = parent_path { - // The previous parent will already have 0 in it - // Append this comment id - format!("{}.{}", parent_path.0, comment_id) - } else { - // '0' is always the first path, append to that - format!("{}.{}", 0, comment_id) - }); - - let updated_comment = diesel::update(comment.find(comment_id)) - .set(path.eq(ltree)) - .get_result::(conn) - .await; + // For each comment, append its id to its path + let updated_comments = diesel::update(comment) + .filter(id.eq_any(inserted_comments.into_iter().map(|c| c.id))) + .set(path.eq(text2ltree( + ltree2text(path).concat(".").concat(AsText::new(id)), + ))) + .load::(conn) + .await?; // Update the child count for the parent comment_aggregates // You could do this with a trigger, but since you have to do this manually anyway, // you can just have it here - if let Some(parent_path) = parent_path { + for parent_path in items.iter().filter_map(|(_, p)| p.as_ref()) { // You have to update counts for all parents, not just the immediate one // TODO if the performance of this is terrible, it might be better to do this as part of a // scheduled query... although the counts would often be wrong. @@ -137,7 +160,7 @@ where ca.comment_id = c.id" sql_query(update_child_count_stmt).execute(conn).await?; } } - updated_comment + Ok(updated_comments) }) as _ }) .await diff --git a/crates/db_schema/src/source/comment.rs b/crates/db_schema/src/source/comment.rs index 3ebea42c1..63ed04f0d 100644 --- a/crates/db_schema/src/source/comment.rs +++ b/crates/db_schema/src/source/comment.rs @@ -67,6 +67,10 @@ pub struct CommentInsertForm { pub deleted: Option, pub ap_id: Option, pub local: Option, + #[cfg(feature = "full")] + pub path: Option, + #[cfg(not(feature = "full"))] + pub path: Option, pub distinguished: Option, pub language_id: Option, } diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 7adf009a4..55667f5da 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -13,13 +13,15 @@ use diesel::{ deserialize::FromSql, helper_types::AsExprOf, pg::Pg, + query_builder::{Query, QueryFragment}, + query_dsl::methods::LimitDsl, result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError}, serialize::{Output, ToSql}, sql_query, sql_types::{Text, Timestamptz}, IntoSql, PgConnection, - RunQueryDsl, Insertable, Table, Column, AsChangeset, Expression, SelectableExpression, expression::NonAggregate, query_builder::QueryFragment, + RunQueryDsl, }; use diesel_async::{ pg::AsyncPgConnection, @@ -153,17 +155,65 @@ macro_rules! try_join_with_pool { }}; } -pub async fn batch_upsert(conn: &mut AsyncPgConnection, target: T, records: U, conflict_target: Target) -> Result, DieselError> -where - T: Table, - T::AllColumns: Expression + SelectableExpression + NonAggregate + QueryFragment, - U: IntoIterator + Clone, - Vec: Insertable, - U::Item: Insertable + AsChangeset, - Target: Column
, -{ - let result = diesel::insert_into(target).values(records.clone().into_iter().collect::>()).load::(conn).await; - +/// Includes an SQL comment before `T`, which can be used to label auto_explain output +#[derive(QueryId)] +pub struct Commented { + comment: String, + inner: T, +} + +impl Commented { + pub fn new(inner: T) -> Self { + Commented { + comment: String::new(), + inner, + } + } + + /// Adds `text` to the comment if `condition` is true + pub fn text_if(mut self, text: &str, condition: bool) -> Self { + if condition { + if !self.comment.is_empty() { + self.comment.push_str(", "); + } + self.comment.push_str(text); + } + self + } + + /// Adds `text` to the comment + pub fn text(self, text: &str) -> Self { + self.text_if(text, true) + } +} + +impl Query for Commented { + type SqlType = T::SqlType; +} + +impl> QueryFragment for Commented { + fn walk_ast<'b>( + &'b self, + mut out: diesel::query_builder::AstPass<'_, 'b, Pg>, + ) -> Result<(), DieselError> { + for line in self.comment.lines() { + out.push_sql("\n-- "); + out.push_sql(line); + } + out.push_sql("\n"); + self.inner.walk_ast(out.reborrow()) + } +} + +impl LimitDsl for Commented { + type Output = Commented; + + fn limit(self, limit: i64) -> Self::Output { + Commented { + comment: self.comment, + inner: self.inner.limit(limit), + } + } } pub fn fuzzy_search(q: &str) -> String { @@ -368,7 +418,10 @@ static EMAIL_REGEX: Lazy = Lazy::new(|| { }); pub mod functions { - use diesel::sql_types::{BigInt, Text, Timestamptz}; + use diesel::{ + pg::Pg, + sql_types::{BigInt, Text, Timestamptz}, + }; sql_function! { fn hot_rank(score: BigInt, time: Timestamptz) -> Double; @@ -386,6 +439,9 @@ pub mod functions { // really this function is variadic, this just adds the two-argument version sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); + + // Use `AsText::new` + postfix_operator!(AsText, "::text", Text, backend: Pg); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index 087bc1ec0..f382f3920 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -43,6 +43,7 @@ use lemmy_db_schema::{ get_conn, limit_and_offset, now, + Commented, DbConn, DbPool, ListFn, @@ -282,7 +283,10 @@ fn queries<'a>() -> Queries< ); } - query.first::(&mut conn).await + Commented::new(query) + .text("PostView::read") + .first::(&mut conn) + .await }; let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move { @@ -557,7 +561,14 @@ fn queries<'a>() -> Queries< debug!("Post View Query: {:?}", debug_query::(&query)); - query.load::(&mut conn).await + Commented::new(query) + .text("PostQuery::list") + .text_if( + "getting upper bound for next query", + options.community_id_just_for_prefetch, + ) + .load::(&mut conn) + .await }; Queries::new(read, list) @@ -1457,12 +1468,13 @@ mod tests { .post_id(inserted_post.id) .content("yes".to_owned()) .build(); - let inserted_comment = Comment::create(pool, &comment_form, None).await.unwrap(); - inserted_comment_ids.push(inserted_comment.id); + comment_forms.push((comment_form, None)); } } } + Comment::create_batch(pool, &comment_forms).await.unwrap(); + let mut listed_post_ids = vec![]; let mut page_after = None; loop { diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index d25b15e75..31d3e0d3f 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -17,24 +17,34 @@ then rm -rf $PGDATA fi -# Create cluster -initdb --username=postgres --auth=trust --no-instructions \ +config_args=( # Only listen to socket in current directory - -c listen_addresses= -c unix_socket_directories=$PWD \ + -c listen_addresses= + -c unix_socket_directories=$PWD + # Write logs to a file in $PGDATA/log - -c logging_collector=on \ + -c logging_collector=on + # Log all query plans by default - -c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 + -c session_preload_libraries=auto_explain + -c auto_explain.log_min_duration=0 + # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on + # Avoid sequential scans so query plans show what index scans can be done # (index scan is normally avoided in some cases, such as the table being small enough) -c enable_seqscan=off + # Don't log parameter values -c auto_explain.log_parameter_max_length=0 +) + +# Create cluster +initdb --username=postgres --auth=trust --no-instructions # Start server that only listens to socket in current directory -pg_ctl start +pg_ctl start --options="${config_args[*]}" # Setup database psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres From 869a2466b7e90f331a77d9daff5c9e641ce5c1de Mon Sep 17 00:00:00 2001 From: dull b Date: Wed, 20 Dec 2023 06:28:27 +0000 Subject: [PATCH 04/18] do things --- Cargo.lock | 13 ++ Cargo.toml | 4 +- crates/db_perf/Cargo.toml | 22 +++ crates/db_perf/src/main.rs | 156 ++++++++++++++++++ crates/db_schema/src/impls/post.rs | 9 +- crates/db_schema/src/utils.rs | 2 + crates/db_schema/src/utils/series.rs | 81 +++++++++ .../down.sql | 42 +++++ .../up.sql | 41 +++++ scripts/db_perf.sh | 17 ++ scripts/start_dev_db.sh | 18 +- scripts/test.sh | 3 +- 12 files changed, 394 insertions(+), 14 deletions(-) create mode 100644 crates/db_perf/Cargo.toml create mode 100644 crates/db_perf/src/main.rs create mode 100644 crates/db_schema/src/utils/series.rs create mode 100644 migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql create mode 100644 migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql create mode 100755 scripts/db_perf.sh diff --git a/Cargo.lock b/Cargo.lock index 5589c610c..8c782c6fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2644,6 +2644,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "lemmy_db_perf" +version = "0.19.0" +dependencies = [ + "clap", + "diesel", + "diesel-async", + "lemmy_db_schema", + "lemmy_db_views", + "lemmy_utils", + "tokio", +] + [[package]] name = "lemmy_db_schema" version = "0.19.0" diff --git a/Cargo.toml b/Cargo.toml index 75d2852c6..6985db965 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "crates/api_common", "crates/apub", "crates/utils", + "crates/db_perf", "crates/db_schema", "crates/db_views", "crates/db_views_actor", @@ -155,6 +156,7 @@ tokio-postgres = "0.7.10" tokio-postgres-rustls = "0.10.0" enum-map = "2.7" moka = { version = "0.12.1", features = ["future"] } +clap = { version = "4.4.11", features = ["derive"] } [dependencies] lemmy_api = { workspace = true } @@ -191,5 +193,5 @@ futures-util = { workspace = true } chrono = { workspace = true } prometheus = { version = "0.13.3", features = ["process"] } serial_test = { workspace = true } -clap = { version = "4.4.11", features = ["derive"] } +clap = { workspace = true } actix-web-prom = "0.7.0" diff --git a/crates/db_perf/Cargo.toml b/crates/db_perf/Cargo.toml new file mode 100644 index 000000000..7aa1566bf --- /dev/null +++ b/crates/db_perf/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "lemmy_db_perf" +version.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true + + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true } +diesel = { workspace = true } +diesel-async = { workspace = true } +lemmy_db_schema = { workspace = true } +lemmy_db_views = { workspace = true, features = ["full"] } +lemmy_utils = { workspace = true } +tokio = { workspace = true } diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs new file mode 100644 index 000000000..5ec4e843c --- /dev/null +++ b/crates/db_perf/src/main.rs @@ -0,0 +1,156 @@ +use clap::{Parser, Subcommand}; +use diesel::{dsl, sql_query, sql_types, ExpressionMethods, IntoSql}; +use diesel_async::RunQueryDsl; +use lemmy_db_schema::{ + schema::post, + source::{ + community::{Community, CommunityInsertForm}, + instance::Instance, + person::{Person, PersonInsertForm}, + }, + traits::Crud, + utils::{ + build_db_pool, + get_conn, + series::{self, ValuesFromSeries}, + }, + SortType, +}; +use lemmy_db_views::{ + post_view::{PaginationCursorData, PostQuery}, + structs::PaginationCursor, +}; +use lemmy_utils::error::LemmyResult; +use std::num::NonZeroU32; + +#[derive(Parser, Debug)] +struct CmdArgs { + #[arg(long, default_value_t = 3.try_into().unwrap())] + communities: NonZeroU32, + #[arg(long, default_value_t = 3.try_into().unwrap())] + people: NonZeroU32, + #[arg(long, default_value_t = 100000.try_into().unwrap())] + posts: NonZeroU32, + #[arg(long)] + read_posts: bool, +} + +#[tokio::main] +async fn main() -> LemmyResult<()> { + let args = CmdArgs::parse(); + let pool = &build_db_pool().await?; + let pool = &mut pool.into(); + + let instance = Instance::read_or_create(pool, "reddit.com".to_owned()).await?; + + println!("🫃 creating {} people", args.people); + let mut person_ids = vec![]; + for i in 0..args.people.get() { + person_ids.push( + Person::create( + pool, + &PersonInsertForm::builder() + .name(format!("p{i}")) + .public_key("pubkey".to_owned()) + .instance_id(instance.id) + .build(), + ) + .await? + .id, + ); + } + + println!("🏠 creating {} communities", args.communities); + let mut community_ids = vec![]; + for i in 0..args.communities.get() { + community_ids.push( + Community::create( + pool, + &CommunityInsertForm::builder() + .name(format!("c{i}")) + .title(i.to_string()) + .instance_id(instance.id) + .build(), + ) + .await? + .id, + ); + } + + let post_batches = args.people.get() * args.communities.get(); + let posts_per_batch = args.posts.get() / post_batches; + let num_posts = post_batches * posts_per_batch; + println!( + "📢 creating {} posts ({} featured in community)", + num_posts, post_batches + ); + let mut num_inserted_posts = 0; + // TODO: progress bar + for person_id in &person_ids { + for community_id in &community_ids { + let n = dsl::insert_into(post::table) + .values(ValuesFromSeries { + start: 1, + stop: posts_per_batch.into(), + selection: ( + "AAAAAAAAAAA".into_sql::(), + person_id.into_sql::(), + community_id.into_sql::(), + series::current_value.eq(1), + ), + }) + .into_columns(( + post::name, + post::creator_id, + post::community_id, + post::featured_community, + )) + .execute(&mut get_conn(pool).await?) + .await?; + num_inserted_posts += n; + } + } + + // Lie detector for the println above + assert_eq!(num_inserted_posts, num_posts as usize); + + // Enable auto_explain + let conn = &mut get_conn(pool).await?; + sql_query("SET auto_explain.log_min_duration = 0") + .execute(conn) + .await?; + let pool = &mut conn.into(); + + if args.read_posts { + let mut page_after = None; + for page_num in 1..=2 { + println!( + "👀 getting page {page_num} of posts (pagination cursor used: {})", + page_after.is_some() + ); + // TODO: include local_user + let post_views = PostQuery { + community_id: community_ids.get(0).cloned(), + sort: Some(SortType::New), + limit: Some(20), + page_after, + ..Default::default() + } + .list(pool) + .await?; + if let Some(post_view) = post_views.into_iter().last() { + println!("👀 getting pagination cursor data for next page "); + let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?; + page_after = Some(cursor_data); + } else { + break; + } + } + } + + if let Ok(path) = std::env::var("PGDATA") { + println!("🪵 query plans written in {path}/log"); + } + + Ok(()) +} diff --git a/crates/db_schema/src/impls/post.rs b/crates/db_schema/src/impls/post.rs index 4f2f88cb2..cb76dc334 100644 --- a/crates/db_schema/src/impls/post.rs +++ b/crates/db_schema/src/impls/post.rs @@ -41,7 +41,14 @@ use crate::{ }; use ::url::Url; use chrono::{Duration, Utc}; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl, TextExpressionMethods}; +use diesel::{ + dsl::insert_into, + result::Error, + ExpressionMethods, + Insertable, + QueryDsl, + TextExpressionMethods, +}; use diesel_async::RunQueryDsl; use std::collections::HashSet; diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 55667f5da..67e005273 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -1,3 +1,5 @@ +pub mod series; + use crate::{ diesel::Connection, diesel_migrations::MigrationHarness, diff --git a/crates/db_schema/src/utils/series.rs b/crates/db_schema/src/utils/series.rs new file mode 100644 index 000000000..c8f6831e6 --- /dev/null +++ b/crates/db_schema/src/utils/series.rs @@ -0,0 +1,81 @@ +use diesel::{ + dsl, + expression::{is_aggregate, ValidGrouping}, + pg::Pg, + query_builder::{AsQuery, AstPass, QueryFragment}, + result::Error, + sql_types, + AppearsOnTable, + Expression, + Insertable, + SelectableExpression, +}; + +#[derive(QueryId)] +pub struct ValuesFromSeries { + pub start: i64, + pub stop: i64, + pub selection: S, +} + +impl> QueryFragment for ValuesFromSeries { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> { + self.selection.walk_ast(out.reborrow())?; + out.push_sql(" FROM generate_series("); + out.push_bind_param::(&self.start)?; + out.push_sql(", "); + out.push_bind_param::(&self.stop)?; + out.push_sql(")"); + + Ok(()) + } +} + +impl Expression for ValuesFromSeries { + type SqlType = S::SqlType; +} + +impl> AppearsOnTable for ValuesFromSeries {} + +impl> SelectableExpression for ValuesFromSeries {} + +impl> Insertable for ValuesFromSeries +where + dsl::BareSelect: AsQuery + Insertable, +{ + type Values = as Insertable>::Values; + + fn values(self) -> Self::Values { + dsl::select(self).values() + } +} + +impl> ValidGrouping<()> + for ValuesFromSeries +{ + type IsAggregate = is_aggregate::No; +} + +#[allow(non_camel_case_types)] +#[derive(QueryId, Clone, Copy, Debug)] +pub struct current_value; + +impl QueryFragment for current_value { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> { + out.push_identifier("generate_series")?; + + Ok(()) + } +} + +impl Expression for current_value { + type SqlType = sql_types::BigInt; +} + +impl AppearsOnTable for current_value {} + +impl SelectableExpression for current_value {} + +impl ValidGrouping<()> for current_value { + type IsAggregate = is_aggregate::No; +} diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql new file mode 100644 index 000000000..9c1ae769e --- /dev/null +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql @@ -0,0 +1,42 @@ +CREATE OR REPLACE FUNCTION post_aggregates_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) + SELECT + NEW.id, + NEW.published, + NEW.published, + NEW.published, + NEW.community_id, + NEW.creator_id, + community.instance_id + FROM + community + WHERE + NEW.community_id = community.id; + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM post_aggregates + WHERE post_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER post_aggregates_post + AFTER INSERT OR DELETE ON post + FOR EACH ROW + EXECUTE PROCEDURE post_aggregates_post (); + +CREATE OR REPLACE FUNCTION generate_unique_changeme () + RETURNS text + LANGUAGE sql + AS $$ + SELECT + 'http://changeme.invalid/' || substr(md5(random()::text), 0, 25); +$$; + +DROP SEQUENCE IF EXISTS changeme_seq; + diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql new file mode 100644 index 000000000..0157e4bb7 --- /dev/null +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -0,0 +1,41 @@ +-- Change post_aggregates trigger to run once per statement instead of once per row. +-- The trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE. + +CREATE OR REPLACE FUNCTION post_aggregates_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) + SELECT + new_post.id, + new_post.published, + new_post.published, + new_post.published, + new_post.community_id, + new_post.creator_id, + (SELECT community.instance_id FROM community WHERE community.id = new_post.community_id LIMIT 1) + FROM + new_post; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER post_aggregates_post + AFTER INSERT ON post + REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE PROCEDURE post_aggregates_post (); + +-- Avoid running hash function and random number generation for default ap_id + +CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE; + +CREATE OR REPLACE FUNCTION generate_unique_changeme () + RETURNS text + LANGUAGE sql + AS $$ + SELECT + 'http://changeme.invalid/seq/' || nextval('changeme_seq')::text; +$$; + diff --git a/scripts/db_perf.sh b/scripts/db_perf.sh new file mode 100755 index 000000000..9d06d5175 --- /dev/null +++ b/scripts/db_perf.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +set -e + +CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)" + +cd $CWD/../ + +source scripts/start_dev_db.sh + +export LEMMY_CONFIG_LOCATION=config/config.hjson +export RUST_BACKTRACE=1 + +cargo run --package lemmy_db_perf -- "$@" + +pg_ctl stop --silent + +# $PGDATA directory is kept so log can be seen diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index 31d3e0d3f..839b68b5e 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -11,7 +11,7 @@ then (pg_ctl status > /dev/null) || pg_status_exit_code=$? if [[ ${pg_status_exit_code} -ne 3 ]] then - pg_ctl stop + pg_ctl stop --silent fi rm -rf $PGDATA @@ -25,27 +25,23 @@ config_args=( # Write logs to a file in $PGDATA/log -c logging_collector=on - # Log all query plans by default + # Allow auto_explain to be turned on -c session_preload_libraries=auto_explain - -c auto_explain.log_min_duration=0 + #-c auto_explain.log_min_duration=0 # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on - # Avoid sequential scans so query plans show what index scans can be done - # (index scan is normally avoided in some cases, such as the table being small enough) - -c enable_seqscan=off - # Don't log parameter values -c auto_explain.log_parameter_max_length=0 ) # Create cluster -initdb --username=postgres --auth=trust --no-instructions +pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructions" # Start server that only listens to socket in current directory -pg_ctl start --options="${config_args[*]}" +pg_ctl start --silent --options="${config_args[*]}" # Setup database -psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres -psql -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres +psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres +psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres diff --git a/scripts/test.sh b/scripts/test.sh index 7a8bd1c29..efe9b1513 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -27,4 +27,5 @@ cargo test -p lemmy_utils --all-features --no-fail-fast # Add this to do printlns: -- --nocapture -pg_ctl stop +pg_ctl stop --silent +rm -rf $PGDATA From ad2af54384a6d915f5e832a1ab93795e037be47f Mon Sep 17 00:00:00 2001 From: dull b Date: Wed, 20 Dec 2023 20:56:37 +0000 Subject: [PATCH 05/18] stuff --- crates/db_perf/src/main.rs | 50 +++++-- .../down.sql | 46 +++++++ .../up.sql | 122 ++++++++++++++++-- scripts/start_dev_db.sh | 8 +- 4 files changed, 201 insertions(+), 25 deletions(-) diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs index 5ec4e843c..d071779a9 100644 --- a/crates/db_perf/src/main.rs +++ b/crates/db_perf/src/main.rs @@ -12,7 +12,7 @@ use lemmy_db_schema::{ utils::{ build_db_pool, get_conn, - series::{self, ValuesFromSeries}, + series::{self, ValuesFromSeries}, DbConn, DbPool, }, SortType, }; @@ -31,8 +31,10 @@ struct CmdArgs { people: NonZeroU32, #[arg(long, default_value_t = 100000.try_into().unwrap())] posts: NonZeroU32, + #[arg(long, default_value_t = 0)] + read_post_pages: u32, #[arg(long)] - read_posts: bool, + explain_insertions: bool, } #[tokio::main] @@ -41,6 +43,14 @@ async fn main() -> LemmyResult<()> { let pool = &build_db_pool().await?; let pool = &mut pool.into(); + let conn = &mut get_conn(pool).await?; + if args.explain_insertions { + sql_query("SET auto_explain.log_min_duration = 0") + .execute(conn) + .await?; + } + let pool = &mut conn.into(); + let instance = Instance::read_or_create(pool, "reddit.com".to_owned()).await?; println!("🫃 creating {} people", args.people); @@ -120,10 +130,10 @@ async fn main() -> LemmyResult<()> { .execute(conn) .await?; let pool = &mut conn.into(); - - if args.read_posts { + + { let mut page_after = None; - for page_num in 1..=2 { + for page_num in 1..=args.read_post_pages { println!( "👀 getting page {page_num} of posts (pagination cursor used: {})", page_after.is_some() @@ -138,19 +148,31 @@ async fn main() -> LemmyResult<()> { } .list(pool) .await?; - if let Some(post_view) = post_views.into_iter().last() { - println!("👀 getting pagination cursor data for next page "); - let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?; - page_after = Some(cursor_data); - } else { - break; - } + if let Some(post_view) = post_views.into_iter().last() { + println!("👀 getting pagination cursor data for next page"); + let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?; + page_after = Some(cursor_data); + } else { + println!("🚫 reached empty page"); + break; + } } } - + + // TODO show this path when there's an error if let Ok(path) = std::env::var("PGDATA") { println!("🪵 query plans written in {path}/log"); } - + Ok(()) } + +async fn conn_with_auto_explain<'a, 'b: 'a>(pool: &'a mut DbPool<'b>) -> LemmyResult> { + let mut conn = get_conn(pool).await?; + + sql_query("SET auto_explain.log_min_duration = 0") + .execute(&mut conn) + .await?; + + Ok(conn) +} diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql index 9c1ae769e..f9282175f 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql @@ -30,6 +30,46 @@ CREATE OR REPLACE TRIGGER post_aggregates_post FOR EACH ROW EXECUTE PROCEDURE post_aggregates_post (); +CREATE OR REPLACE TRIGGER community_aggregates_post_count + AFTER INSERT OR DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE PROCEDURE community_aggregates_post_count (); + +DROP FUNCTION IF EXISTS community_aggregates_post_count_insert CASCADE; + +DROP FUNCTION IF EXISTS community_aggregates_post_update CASCADE; + +DROP FUNCTION IF EXISTS site_aggregates_post_update CASCADE; + +DROP FUNCTION IF EXISTS person_aggregates_post_insert CASCADE; + +CREATE OR REPLACE FUNCTION site_aggregates_post_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + posts = posts + 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER site_aggregates_post_insert + AFTER INSERT OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + WHEN (NEW.local = TRUE) + EXECUTE PROCEDURE site_aggregates_post_insert (); + CREATE OR REPLACE FUNCTION generate_unique_changeme () RETURNS text LANGUAGE sql @@ -38,5 +78,11 @@ CREATE OR REPLACE FUNCTION generate_unique_changeme () 'http://changeme.invalid/' || substr(md5(random()::text), 0, 25); $$; +CREATE TRIGGER person_aggregates_post_count + AFTER INSERT OR DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE PROCEDURE person_aggregates_post_count (); + DROP SEQUENCE IF EXISTS changeme_seq; diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql index 0157e4bb7..593f9638e 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -1,5 +1,6 @@ --- Change post_aggregates trigger to run once per statement instead of once per row. --- The trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE. +-- Change triggers to run once per statement instead of once per row + +-- post_aggregates_post trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE CREATE OR REPLACE FUNCTION post_aggregates_post () RETURNS TRIGGER @@ -8,25 +9,128 @@ CREATE OR REPLACE FUNCTION post_aggregates_post () BEGIN INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) SELECT - new_post.id, - new_post.published, - new_post.published, - new_post.published, - new_post.community_id, - new_post.creator_id, - (SELECT community.instance_id FROM community WHERE community.id = new_post.community_id LIMIT 1) + id, + published, + published, + published, + community_id, + creator_id, + (SELECT community.instance_id FROM community WHERE community.id = community_id LIMIT 1) FROM new_post; RETURN NULL; END $$; +CREATE OR REPLACE FUNCTION community_aggregates_post_count_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE community_aggregates + SET posts = posts + post_group.count + FROM (SELECT community_id, count(*) FROM new_post GROUP BY community_id) post_group + WHERE community_aggregates.community_id = post_group.community_id; + RETURN NULL; +END +$$; + +CREATE OR REPLACE FUNCTION person_aggregates_post_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE person_aggregates + SET post_count = post_count + post_group.count + FROM (SELECT creator_id, count(*) FROM new_post GROUP BY creator_id) post_group + WHERE person_aggregates.person_id = post_group.creator_id; + RETURN NULL; +END +$$; + CREATE OR REPLACE TRIGGER post_aggregates_post AFTER INSERT ON post REFERENCING NEW TABLE AS new_post FOR EACH STATEMENT EXECUTE PROCEDURE post_aggregates_post (); +-- Don't run old trigger for insert + +CREATE OR REPLACE TRIGGER community_aggregates_post_count + AFTER DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE PROCEDURE community_aggregates_post_count (); + +CREATE OR REPLACE TRIGGER community_aggregates_post_count_insert + AFTER INSERT ON post + REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE PROCEDURE community_aggregates_post_count_insert (); + +DROP FUNCTION IF EXISTS site_aggregates_community_delete CASCADE; + +CREATE OR REPLACE FUNCTION site_aggregates_post_update () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + posts = posts + 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE OR REPLACE FUNCTION site_aggregates_post_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + site_aggregates sa + SET + posts = posts + (SELECT count(*) FROM new_post) + FROM + site s + WHERE + sa.site_id = s.id; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER site_aggregates_post_update + AFTER UPDATE OF removed, + deleted ON post + FOR EACH ROW + WHEN (NEW.local = TRUE) + EXECUTE PROCEDURE site_aggregates_post_update (); + +CREATE OR REPLACE TRIGGER site_aggregates_post_insert + AFTER INSERT ON post + REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE PROCEDURE site_aggregates_post_insert (); + +CREATE OR REPLACE TRIGGER person_aggregates_post_count + AFTER DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE PROCEDURE person_aggregates_post_count (); + +CREATE OR REPLACE TRIGGER person_aggregates_post_insert + AFTER INSERT ON post + REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE PROCEDURE person_aggregates_post_insert (); + -- Avoid running hash function and random number generation for default ap_id CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE; diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index 839b68b5e..f6b4e2a9a 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -2,12 +2,14 @@ export PGDATA="$PWD/dev_pgdata" export PGHOST=$PWD -export LEMMY_DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD" +export DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD" +export LEMMY_DATABASE_URL=$DATABASE_URL # If cluster exists, stop the server and delete the cluster if [[ -d $PGDATA ]] then # Only stop server if it is running + pg_status_exit_code=0 (pg_ctl status > /dev/null) || pg_status_exit_code=$? if [[ ${pg_status_exit_code} -ne 3 ]] then @@ -27,7 +29,9 @@ config_args=( # Allow auto_explain to be turned on -c session_preload_libraries=auto_explain - #-c auto_explain.log_min_duration=0 + + # Log triggers + -c auto_explain.log_nested_statements=on # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on From 191e2f0c57839c0d4b6ccdbe7645848c8d49c99c Mon Sep 17 00:00:00 2001 From: dull b Date: Wed, 20 Dec 2023 21:20:56 +0000 Subject: [PATCH 06/18] different timestamps --- crates/db_perf/src/main.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs index d071779a9..6a23e0113 100644 --- a/crates/db_perf/src/main.rs +++ b/crates/db_perf/src/main.rs @@ -1,5 +1,5 @@ use clap::{Parser, Subcommand}; -use diesel::{dsl, sql_query, sql_types, ExpressionMethods, IntoSql}; +use diesel::{dsl::{self, sql}, sql_query, sql_types, ExpressionMethods, IntoSql}; use diesel_async::RunQueryDsl; use lemmy_db_schema::{ schema::post, @@ -12,7 +12,7 @@ use lemmy_db_schema::{ utils::{ build_db_pool, get_conn, - series::{self, ValuesFromSeries}, DbConn, DbPool, + series::{self, ValuesFromSeries}, DbConn, DbPool, now, }, SortType, }; @@ -22,6 +22,7 @@ use lemmy_db_views::{ }; use lemmy_utils::error::LemmyResult; use std::num::NonZeroU32; +use diesel::pg::expression::dsl::IntervalDsl; #[derive(Parser, Debug)] struct CmdArgs { @@ -107,6 +108,7 @@ async fn main() -> LemmyResult<()> { person_id.into_sql::(), community_id.into_sql::(), series::current_value.eq(1), + now() - sql::("make_interval(secs => ").bind::(series::current_value).sql(")"), ), }) .into_columns(( @@ -114,6 +116,7 @@ async fn main() -> LemmyResult<()> { post::creator_id, post::community_id, post::featured_community, + post::published, )) .execute(&mut get_conn(pool).await?) .await?; From 82e37f7b57d3ca9802a82b5d677bf91531494e9f Mon Sep 17 00:00:00 2001 From: dull b Date: Wed, 20 Dec 2023 22:50:55 +0000 Subject: [PATCH 07/18] stuff --- Cargo.lock | 1 + crates/db_perf/src/main.rs | 158 ++++++++++++++--------------- crates/db_schema/Cargo.toml | 1 + crates/db_schema/src/impls/post.rs | 9 +- crates/db_schema/src/utils.rs | 20 ++-- scripts/start_dev_db.sh | 5 +- 6 files changed, 87 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8c782c6fb..337c7628e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2662,6 +2662,7 @@ name = "lemmy_db_schema" version = "0.19.0" dependencies = [ "activitypub_federation", + "anyhow", "async-trait", "bcrypt", "chrono", diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs index 6a23e0113..1143c99c9 100644 --- a/crates/db_perf/src/main.rs +++ b/crates/db_perf/src/main.rs @@ -1,6 +1,11 @@ -use clap::{Parser, Subcommand}; -use diesel::{dsl::{self, sql}, sql_query, sql_types, ExpressionMethods, IntoSql}; -use diesel_async::RunQueryDsl; +use clap::Parser; +use diesel::{ + dsl::{self, sql}, + sql_types, + ExpressionMethods, + IntoSql, +}; +use diesel_async::{RunQueryDsl, SimpleAsyncConnection}; use lemmy_db_schema::{ schema::post, source::{ @@ -12,17 +17,14 @@ use lemmy_db_schema::{ utils::{ build_db_pool, get_conn, - series::{self, ValuesFromSeries}, DbConn, DbPool, now, + now, + series::{self, ValuesFromSeries}, }, SortType, }; -use lemmy_db_views::{ - post_view::{PaginationCursorData, PostQuery}, - structs::PaginationCursor, -}; +use lemmy_db_views::{post_view::PostQuery, structs::PaginationCursor}; use lemmy_utils::error::LemmyResult; use std::num::NonZeroU32; -use diesel::pg::expression::dsl::IntervalDsl; #[derive(Parser, Debug)] struct CmdArgs { @@ -40,52 +42,53 @@ struct CmdArgs { #[tokio::main] async fn main() -> LemmyResult<()> { + if let Err(err) = try_main().await { + println!("Error: {err:?}"); + } + if let Ok(path) = std::env::var("PGDATA") { + println!("🪵 query plans and error details written in {path}/log"); + } + + Ok(()) +} + +async fn try_main() -> LemmyResult<()> { let args = CmdArgs::parse(); let pool = &build_db_pool().await?; let pool = &mut pool.into(); - let conn = &mut get_conn(pool).await?; + if args.explain_insertions { - sql_query("SET auto_explain.log_min_duration = 0") - .execute(conn) + // log_nested_statements is enabled to log trigger execution + conn + .batch_execute( + "SET auto_explain.log_min_duration = 0; SET auto_explain.log_nested_statements = on;", + ) .await?; } - let pool = &mut conn.into(); - let instance = Instance::read_or_create(pool, "reddit.com".to_owned()).await?; + let instance = Instance::read_or_create(&mut conn.into(), "reddit.com".to_owned()).await?; println!("🫃 creating {} people", args.people); let mut person_ids = vec![]; for i in 0..args.people.get() { - person_ids.push( - Person::create( - pool, - &PersonInsertForm::builder() - .name(format!("p{i}")) - .public_key("pubkey".to_owned()) - .instance_id(instance.id) - .build(), - ) - .await? - .id, - ); + let form = PersonInsertForm::builder() + .name(format!("p{i}")) + .public_key("pubkey".to_owned()) + .instance_id(instance.id) + .build(); + person_ids.push(Person::create(&mut conn.into(), &form).await?.id); } println!("🏠 creating {} communities", args.communities); let mut community_ids = vec![]; for i in 0..args.communities.get() { - community_ids.push( - Community::create( - pool, - &CommunityInsertForm::builder() - .name(format!("c{i}")) - .title(i.to_string()) - .instance_id(instance.id) - .build(), - ) - .await? - .id, - ); + let form = CommunityInsertForm::builder() + .name(format!("c{i}")) + .title(i.to_string()) + .instance_id(instance.id) + .build(); + community_ids.push(Community::create(&mut conn.into(), &form).await?.id); } let post_batches = args.people.get() * args.communities.get(); @@ -108,7 +111,10 @@ async fn main() -> LemmyResult<()> { person_id.into_sql::(), community_id.into_sql::(), series::current_value.eq(1), - now() - sql::("make_interval(secs => ").bind::(series::current_value).sql(")"), + now() + - sql::("make_interval(secs => ") + .bind::(series::current_value) + .sql(")"), ), }) .into_columns(( @@ -118,64 +124,50 @@ async fn main() -> LemmyResult<()> { post::featured_community, post::published, )) - .execute(&mut get_conn(pool).await?) + .execute(conn) .await?; num_inserted_posts += n; } } - - // Lie detector for the println above + // Make sure the println above shows the correct amount assert_eq!(num_inserted_posts, num_posts as usize); // Enable auto_explain - let conn = &mut get_conn(pool).await?; - sql_query("SET auto_explain.log_min_duration = 0") - .execute(conn) + conn + .batch_execute( + "SET auto_explain.log_min_duration = 0; SET auto_explain.log_nested_statements = off;", + ) .await?; - let pool = &mut conn.into(); - - { - let mut page_after = None; - for page_num in 1..=args.read_post_pages { - println!( - "👀 getting page {page_num} of posts (pagination cursor used: {})", - page_after.is_some() - ); - // TODO: include local_user - let post_views = PostQuery { - community_id: community_ids.get(0).cloned(), - sort: Some(SortType::New), - limit: Some(20), - page_after, - ..Default::default() - } - .list(pool) - .await?; + + let mut page_after = None; + for page_num in 1..=args.read_post_pages { + println!( + "👀 getting page {page_num} of posts (pagination cursor used: {})", + page_after.is_some() + ); + + // TODO: include local_user + let post_views = PostQuery { + community_id: community_ids.get(0).cloned(), + sort: Some(SortType::New), + limit: Some(20), + page_after, + ..Default::default() + } + .list(&mut conn.into()) + .await?; + if let Some(post_view) = post_views.into_iter().last() { println!("👀 getting pagination cursor data for next page"); - let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?; + let cursor_data = PaginationCursor::after_post(&post_view) + .read(&mut conn.into()) + .await?; page_after = Some(cursor_data); } else { - println!("🚫 reached empty page"); + println!("👀 reached empty page"); break; } - } } - - // TODO show this path when there's an error - if let Ok(path) = std::env::var("PGDATA") { - println!("🪵 query plans written in {path}/log"); - } - + Ok(()) } - -async fn conn_with_auto_explain<'a, 'b: 'a>(pool: &'a mut DbPool<'b>) -> LemmyResult> { - let mut conn = get_conn(pool).await?; - - sql_query("SET auto_explain.log_min_duration = 0") - .execute(&mut conn) - .await?; - - Ok(conn) -} diff --git a/crates/db_schema/Cargo.toml b/crates/db_schema/Cargo.toml index f3ebc0b66..0ce3cfca6 100644 --- a/crates/db_schema/Cargo.toml +++ b/crates/db_schema/Cargo.toml @@ -76,6 +76,7 @@ tokio-postgres = { workspace = true, optional = true } tokio-postgres-rustls = { workspace = true, optional = true } rustls = { workspace = true, optional = true } uuid = { workspace = true, features = ["v4"] } +anyhow = { workspace = true } [dev-dependencies] serial_test = { workspace = true } diff --git a/crates/db_schema/src/impls/post.rs b/crates/db_schema/src/impls/post.rs index cb76dc334..4f2f88cb2 100644 --- a/crates/db_schema/src/impls/post.rs +++ b/crates/db_schema/src/impls/post.rs @@ -41,14 +41,7 @@ use crate::{ }; use ::url::Url; use chrono::{Duration, Utc}; -use diesel::{ - dsl::insert_into, - result::Error, - ExpressionMethods, - Insertable, - QueryDsl, - TextExpressionMethods, -}; +use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl, TextExpressionMethods}; use diesel_async::RunQueryDsl; use std::collections::HashSet; diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 67e005273..be12639e7 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -8,6 +8,7 @@ use crate::{ SortType, }; use activitypub_federation::{fetch::object_id::ObjectId, traits::Object}; +use anyhow::Context; use chrono::{DateTime, Utc}; use deadpool::Runtime; use diesel::{ @@ -19,11 +20,9 @@ use diesel::{ query_dsl::methods::LimitDsl, result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError}, serialize::{Output, ToSql}, - sql_query, sql_types::{Text, Timestamptz}, IntoSql, PgConnection, - RunQueryDsl, }; use diesel_async::{ pg::AsyncPgConnection, @@ -343,21 +342,18 @@ impl ServerCertVerifier for NoCertVerifier { pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); -fn run_migrations(db_url: &str) { +fn run_migrations(db_url: &str) -> Result<(), LemmyError> { // Needs to be a sync connection let mut conn = - PgConnection::establish(db_url).unwrap_or_else(|e| panic!("Error connecting to {db_url}: {e}")); - - // Disable auto_explain output for migrations - sql_query("SET auto_explain.log_min_duration = -1") - .execute(&mut conn) - .expect("failed to disable auto_explain"); + PgConnection::establish(db_url).with_context(|| format!("Error connecting to {db_url}"))?; info!("Running Database migrations (This may take a long time)..."); - let _ = &mut conn + conn .run_pending_migrations(MIGRATIONS) - .unwrap_or_else(|e| panic!("Couldn't run DB Migrations: {e}")); + .map_err(|e| anyhow::Error::msg(format!("Couldn't run DB Migrations: {e}")))?; info!("Database migrations complete."); + + Ok(()) } pub async fn build_db_pool() -> Result { @@ -381,7 +377,7 @@ pub async fn build_db_pool() -> Result { .runtime(Runtime::Tokio1) .build()?; - run_migrations(&db_url); + run_migrations(&db_url)?; Ok(pool) } diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index f6b4e2a9a..8ea4a294e 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -30,9 +30,6 @@ config_args=( # Allow auto_explain to be turned on -c session_preload_libraries=auto_explain - # Log triggers - -c auto_explain.log_nested_statements=on - # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on @@ -43,7 +40,7 @@ config_args=( # Create cluster pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructions" -# Start server that only listens to socket in current directory +# Start server pg_ctl start --silent --options="${config_args[*]}" # Setup database From f54a350b8553ae357920d9e98443a5699211e5fb Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 15:53:31 -0700 Subject: [PATCH 08/18] Revert changes to comment.rs --- crates/db_schema/src/impls/comment.rs | 95 ++++++++------------------- 1 file changed, 28 insertions(+), 67 deletions(-) diff --git a/crates/db_schema/src/impls/comment.rs b/crates/db_schema/src/impls/comment.rs index a2e95fa1c..aef399c59 100644 --- a/crates/db_schema/src/impls/comment.rs +++ b/crates/db_schema/src/impls/comment.rs @@ -1,16 +1,6 @@ use crate::{ newtypes::{CommentId, DbUrl, PersonId}, - schema::comment::dsl::{ - ap_id, - comment, - content, - creator_id, - deleted, - id, - path, - removed, - updated, - }, + schema::comment::dsl::{ap_id, comment, content, creator_id, deleted, path, removed, updated}, source::comment::{ Comment, CommentInsertForm, @@ -21,24 +11,16 @@ use crate::{ CommentUpdateForm, }, traits::{Crud, Likeable, Saveable}, - utils::{functions::AsText, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, + utils::{get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, }; use diesel::{ dsl::{insert_into, sql_query}, result::Error, ExpressionMethods, QueryDsl, - TextExpressionMethods, }; use diesel_async::RunQueryDsl; -use diesel_ltree::{ - functions::{ltree2text, text2ltree}, - Ltree, -}; -use futures_util::{ - future::TryFutureExt, - stream::{self, StreamExt, TryStreamExt}, -}; +use diesel_ltree::Ltree; use url::Url; impl Comment { @@ -75,63 +57,42 @@ impl Comment { comment_form: &CommentInsertForm, parent_path: Option<&Ltree>, ) -> Result { - Comment::create_batch(pool, &[(comment_form.clone(), parent_path.cloned())]) - .await? - .into_iter() - .next() - .ok_or(Error::NotFound) - } - - pub async fn create_batch( - pool: &mut DbPool<'_>, - items: &[(CommentInsertForm, Option)], - ) -> Result, Error> { let conn = &mut get_conn(pool).await?; conn .build_transaction() .run(|conn| { Box::pin(async move { - let forms = items - .iter() - .map(|(comment_form, parent_path)| CommentInsertForm { - path: Some(parent_path.clone().unwrap_or(Ltree("0".to_owned()))), - ..comment_form.clone() - }); - - // Insert, to get the ids - let inserted_comments = insert_into(comment) - .values(forms.clone().collect::>()) - .load::(conn) - .or_else(|_| { - // `ap_id` unique constraint violation is handled individually for each row - // because batched upsert requires having the same `set` argument for all rows - stream::iter(forms) - .then(|form| { - insert_into(comment) - .values(form.clone()) - .on_conflict(ap_id) - .do_update() - .set(form) - .get_result::(conn) - }) - .try_collect::>() - }) + // Insert, to get the id + let inserted_comment = insert_into(comment) + .values(comment_form) + .on_conflict(ap_id) + .do_update() + .set(comment_form) + .get_result::(conn) .await?; - // For each comment, append its id to its path - let updated_comments = diesel::update(comment) - .filter(id.eq_any(inserted_comments.into_iter().map(|c| c.id))) - .set(path.eq(text2ltree( - ltree2text(path).concat(".").concat(AsText::new(id)), - ))) - .load::(conn) - .await?; + let comment_id = inserted_comment.id; + + // You need to update the ltree column + let ltree = Ltree(if let Some(parent_path) = parent_path { + // The previous parent will already have 0 in it + // Append this comment id + format!("{}.{}", parent_path.0, comment_id) + } else { + // '0' is always the first path, append to that + format!("{}.{}", 0, comment_id) + }); + + let updated_comment = diesel::update(comment.find(comment_id)) + .set(path.eq(ltree)) + .get_result::(conn) + .await; // Update the child count for the parent comment_aggregates // You could do this with a trigger, but since you have to do this manually anyway, // you can just have it here - for parent_path in items.iter().filter_map(|(_, p)| p.as_ref()) { + if let Some(parent_path) = parent_path { // You have to update counts for all parents, not just the immediate one // TODO if the performance of this is terrible, it might be better to do this as part of a // scheduled query... although the counts would often be wrong. @@ -160,7 +121,7 @@ where ca.comment_id = c.id" sql_query(update_child_count_stmt).execute(conn).await?; } } - Ok(updated_comments) + updated_comment }) as _ }) .await From d89610964a56413bcfe3d7b16db9de089e505653 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 15:55:29 -0700 Subject: [PATCH 09/18] Update comment.rs --- crates/apub/src/objects/comment.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index 0a791d842..ab98fd7b1 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -172,7 +172,6 @@ impl Object for ApubComment { deleted: Some(false), ap_id: Some(note.id.into()), distinguished: note.distinguished, - path: None, local: Some(false), language_id, }; From ef654b147250704424f93c357eef6b69fe5f02b8 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 15:56:19 -0700 Subject: [PATCH 10/18] Update comment.rs --- crates/db_schema/src/source/comment.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/db_schema/src/source/comment.rs b/crates/db_schema/src/source/comment.rs index 63ed04f0d..3ebea42c1 100644 --- a/crates/db_schema/src/source/comment.rs +++ b/crates/db_schema/src/source/comment.rs @@ -67,10 +67,6 @@ pub struct CommentInsertForm { pub deleted: Option, pub ap_id: Option, pub local: Option, - #[cfg(feature = "full")] - pub path: Option, - #[cfg(not(feature = "full"))] - pub path: Option, pub distinguished: Option, pub language_id: Option, } From 49ca4da76305a2e15c92ba809ef9b7a7a20f1563 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 16:01:18 -0700 Subject: [PATCH 11/18] Update post_view.rs --- crates/db_views/src/post_view.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index f382f3920..edeb4ba44 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -1446,7 +1446,7 @@ mod tests { let inserted_community = Community::create(pool, &community_form).await.unwrap(); let mut inserted_post_ids = vec![]; - let mut comment_forms = vec![]; + let mut inserted_comment_ids = vec![]; // Create 150 posts with varying non-correlating values for publish date, number of comments, and featured for comments in 0..10 { @@ -1468,13 +1468,12 @@ mod tests { .post_id(inserted_post.id) .content("yes".to_owned()) .build(); - comment_forms.push((comment_form, None)); + let inserted_comment = Comment::create(pool, &comment_form, None).await.unwrap(); + inserted_comment_ids.push(inserted_comment.id); } } } - Comment::create_batch(pool, &comment_forms).await.unwrap(); - let mut listed_post_ids = vec![]; let mut page_after = None; loop { From fc300083a202fd5f777785b375dda94873d7f4ab Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 16:15:04 -0700 Subject: [PATCH 12/18] Update utils.rs --- crates/db_schema/src/utils.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index be12639e7..100906216 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -416,10 +416,7 @@ static EMAIL_REGEX: Lazy = Lazy::new(|| { }); pub mod functions { - use diesel::{ - pg::Pg, - sql_types::{BigInt, Text, Timestamptz}, - }; + use diesel::sql_types::{BigInt, Text, Timestamptz}; sql_function! { fn hot_rank(score: BigInt, time: Timestamptz) -> Double; @@ -437,9 +434,6 @@ pub mod functions { // really this function is variadic, this just adds the two-argument version sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); - - // Use `AsText::new` - postfix_operator!(AsText, "::text", Text, backend: Pg); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; From 465440491d7801287444c8a54b95ba4bc574891a Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 16:42:04 -0700 Subject: [PATCH 13/18] Update up.sql --- .../up.sql | 67 +++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql index 593f9638e..fa409a912 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -15,9 +15,16 @@ BEGIN published, community_id, creator_id, - (SELECT community.instance_id FROM community WHERE community.id = community_id LIMIT 1) - FROM - new_post; + ( + SELECT + community.instance_id + FROM + community + WHERE + community.id = community_id + LIMIT 1) +FROM + new_post; RETURN NULL; END $$; @@ -27,10 +34,20 @@ CREATE OR REPLACE FUNCTION community_aggregates_post_count_insert () LANGUAGE plpgsql AS $$ BEGIN - UPDATE community_aggregates - SET posts = posts + post_group.count - FROM (SELECT community_id, count(*) FROM new_post GROUP BY community_id) post_group - WHERE community_aggregates.community_id = post_group.community_id; + UPDATE + community_aggregates + SET + posts = posts + post_group.count + FROM ( + SELECT + community_id, + count(*) + FROM + new_post + GROUP BY + community_id) post_group +WHERE + community_aggregates.community_id = post_group.community_id; RETURN NULL; END $$; @@ -40,17 +57,26 @@ CREATE OR REPLACE FUNCTION person_aggregates_post_insert () LANGUAGE plpgsql AS $$ BEGIN - UPDATE person_aggregates - SET post_count = post_count + post_group.count - FROM (SELECT creator_id, count(*) FROM new_post GROUP BY creator_id) post_group - WHERE person_aggregates.person_id = post_group.creator_id; + UPDATE + person_aggregates + SET + post_count = post_count + post_group.count + FROM ( + SELECT + creator_id, + count(*) + FROM + new_post + GROUP BY + creator_id) post_group +WHERE + person_aggregates.person_id = post_group.creator_id; RETURN NULL; END $$; CREATE OR REPLACE TRIGGER post_aggregates_post - AFTER INSERT ON post - REFERENCING NEW TABLE AS new_post + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post FOR EACH STATEMENT EXECUTE PROCEDURE post_aggregates_post (); @@ -63,8 +89,7 @@ CREATE OR REPLACE TRIGGER community_aggregates_post_count EXECUTE PROCEDURE community_aggregates_post_count (); CREATE OR REPLACE TRIGGER community_aggregates_post_count_insert - AFTER INSERT ON post - REFERENCING NEW TABLE AS new_post + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post FOR EACH STATEMENT EXECUTE PROCEDURE community_aggregates_post_count_insert (); @@ -97,7 +122,11 @@ BEGIN UPDATE site_aggregates sa SET - posts = posts + (SELECT count(*) FROM new_post) + posts = posts + ( + SELECT + count(*) + FROM + new_post) FROM site s WHERE @@ -114,8 +143,7 @@ CREATE OR REPLACE TRIGGER site_aggregates_post_update EXECUTE PROCEDURE site_aggregates_post_update (); CREATE OR REPLACE TRIGGER site_aggregates_post_insert - AFTER INSERT ON post - REFERENCING NEW TABLE AS new_post + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post FOR EACH STATEMENT EXECUTE PROCEDURE site_aggregates_post_insert (); @@ -126,8 +154,7 @@ CREATE OR REPLACE TRIGGER person_aggregates_post_count EXECUTE PROCEDURE person_aggregates_post_count (); CREATE OR REPLACE TRIGGER person_aggregates_post_insert - AFTER INSERT ON post - REFERENCING NEW TABLE AS new_post + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post FOR EACH STATEMENT EXECUTE PROCEDURE person_aggregates_post_insert (); From f4dd7887fdef4af5e85cc61b8e8ce89f1bff278d Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 17:17:51 -0700 Subject: [PATCH 14/18] Update up.sql --- .../2023-12-19-210053_tolerable-batch-insert-speed/up.sql | 4 ---- 1 file changed, 4 deletions(-) diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql index fa409a912..06ed54e24 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -1,7 +1,5 @@ -- Change triggers to run once per statement instead of once per row - -- post_aggregates_post trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE - CREATE OR REPLACE FUNCTION post_aggregates_post () RETURNS TRIGGER LANGUAGE plpgsql @@ -81,7 +79,6 @@ CREATE OR REPLACE TRIGGER post_aggregates_post EXECUTE PROCEDURE post_aggregates_post (); -- Don't run old trigger for insert - CREATE OR REPLACE TRIGGER community_aggregates_post_count AFTER DELETE OR UPDATE OF removed, deleted ON post @@ -159,7 +156,6 @@ CREATE OR REPLACE TRIGGER person_aggregates_post_insert EXECUTE PROCEDURE person_aggregates_post_insert (); -- Avoid running hash function and random number generation for default ap_id - CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE; CREATE OR REPLACE FUNCTION generate_unique_changeme () From 1fcf2f74e9c5644569a5619008b7d8953b3eaec7 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 17:44:59 -0700 Subject: [PATCH 15/18] Update down.sql --- .../2023-12-19-210053_tolerable-batch-insert-speed/down.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql index f9282175f..3b08a616b 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql @@ -78,7 +78,7 @@ CREATE OR REPLACE FUNCTION generate_unique_changeme () 'http://changeme.invalid/' || substr(md5(random()::text), 0, 25); $$; -CREATE TRIGGER person_aggregates_post_count +CREATE OR REPLACE TRIGGER person_aggregates_post_count AFTER INSERT OR DELETE OR UPDATE OF removed, deleted ON post FOR EACH ROW From 22030b54437530b540b4750ba4dcedda26da90e5 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 20:24:55 -0700 Subject: [PATCH 16/18] Update up.sql --- .../2023-12-19-210053_tolerable-batch-insert-speed/up.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql index 06ed54e24..aaa866d3a 100644 --- a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -90,8 +90,6 @@ CREATE OR REPLACE TRIGGER community_aggregates_post_count_insert FOR EACH STATEMENT EXECUTE PROCEDURE community_aggregates_post_count_insert (); -DROP FUNCTION IF EXISTS site_aggregates_community_delete CASCADE; - CREATE OR REPLACE FUNCTION site_aggregates_post_update () RETURNS TRIGGER LANGUAGE plpgsql From abcf331a262f7fb5fae6db36d0401310605f3c12 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 22:46:09 -0700 Subject: [PATCH 17/18] Update main.rs --- crates/db_perf/src/main.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs index 1143c99c9..4976f9e01 100644 --- a/crates/db_perf/src/main.rs +++ b/crates/db_perf/src/main.rs @@ -43,7 +43,7 @@ struct CmdArgs { #[tokio::main] async fn main() -> LemmyResult<()> { if let Err(err) = try_main().await { - println!("Error: {err:?}"); + println!("😂 Error: {err:?}"); } if let Ok(path) = std::env::var("PGDATA") { println!("🪵 query plans and error details written in {path}/log"); @@ -80,7 +80,7 @@ async fn try_main() -> LemmyResult<()> { person_ids.push(Person::create(&mut conn.into(), &form).await?.id); } - println!("🏠 creating {} communities", args.communities); + println!("🌍 creating {} communities", args.communities); let mut community_ids = vec![]; for i in 0..args.communities.get() { let form = CommunityInsertForm::builder() @@ -95,7 +95,7 @@ async fn try_main() -> LemmyResult<()> { let posts_per_batch = args.posts.get() / post_batches; let num_posts = post_batches * posts_per_batch; println!( - "📢 creating {} posts ({} featured in community)", + "📜 creating {} posts ({} featured in community)", num_posts, post_batches ); let mut num_inserted_posts = 0; @@ -139,6 +139,7 @@ async fn try_main() -> LemmyResult<()> { ) .await?; + // TODO: show execution duration stats let mut page_after = None; for page_num in 1..=args.read_post_pages { println!( From 029cb25e7710dd7f4f9ac8fdcbf734fb80999140 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 20 Dec 2023 22:52:01 -0700 Subject: [PATCH 18/18] use anyhow macro --- crates/db_schema/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 100906216..18b846fc6 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -350,7 +350,7 @@ fn run_migrations(db_url: &str) -> Result<(), LemmyError> { info!("Running Database migrations (This may take a long time)..."); conn .run_pending_migrations(MIGRATIONS) - .map_err(|e| anyhow::Error::msg(format!("Couldn't run DB Migrations: {e}")))?; + .map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?; info!("Database migrations complete."); Ok(())