From 9b9314a1d6f8bd28064d1641f4fbc129d59bcdb9 Mon Sep 17 00:00:00 2001 From: dull b Date: Sat, 16 Dec 2023 20:52:19 +0000 Subject: [PATCH] stuff --- crates/apub/src/objects/comment.rs | 1 + crates/db_schema/src/impls/actor_language.rs | 18 ++-- crates/db_schema/src/impls/comment.rs | 93 ++++++++++++-------- crates/db_schema/src/source/comment.rs | 4 + crates/db_schema/src/utils.rs | 82 ++++++++++++++--- crates/db_views/src/post_view.rs | 20 ++++- scripts/start_dev_db.sh | 22 +++-- 7 files changed, 174 insertions(+), 66 deletions(-) diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index ab98fd7b1..0a791d842 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -172,6 +172,7 @@ impl Object for ApubComment { deleted: Some(false), ap_id: Some(note.id.into()), distinguished: note.distinguished, + path: None, local: Some(false), language_id, }; diff --git a/crates/db_schema/src/impls/actor_language.rs b/crates/db_schema/src/impls/actor_language.rs index 2e80b0680..36ac70b0b 100644 --- a/crates/db_schema/src/impls/actor_language.rs +++ b/crates/db_schema/src/impls/actor_language.rs @@ -166,16 +166,18 @@ impl SiteLanguage { .execute(conn) .await?; - for l in lang_ids { - let form = SiteLanguageForm { + let forms = lang_ids + .into_iter() + .map(|l| SiteLanguageForm { site_id: for_site_id, language_id: l, - }; - insert_into(site_language) - .values(form) - .get_result::(conn) - .await?; - } + }) + .collect::>(); + + insert_into(site_language) + .values(forms) + .get_result::(conn) + .await?; CommunityLanguage::limit_languages(conn, instance_id).await?; diff --git a/crates/db_schema/src/impls/comment.rs b/crates/db_schema/src/impls/comment.rs index f59afb6aa..a2e95fa1c 100644 --- a/crates/db_schema/src/impls/comment.rs +++ b/crates/db_schema/src/impls/comment.rs @@ -1,6 +1,16 @@ use crate::{ newtypes::{CommentId, DbUrl, PersonId}, - schema::comment::dsl::{ap_id, comment, content, creator_id, deleted, path, removed, updated}, + schema::comment::dsl::{ + ap_id, + comment, + content, + creator_id, + deleted, + id, + path, + removed, + updated, + }, source::comment::{ Comment, CommentInsertForm, @@ -11,16 +21,24 @@ use crate::{ CommentUpdateForm, }, traits::{Crud, Likeable, Saveable}, - utils::{get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, + utils::{functions::AsText, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, }; use diesel::{ dsl::{insert_into, sql_query}, result::Error, ExpressionMethods, QueryDsl, + TextExpressionMethods, }; use diesel_async::RunQueryDsl; -use diesel_ltree::Ltree; +use diesel_ltree::{ + functions::{ltree2text, text2ltree}, + Ltree, +}; +use futures_util::{ + future::TryFutureExt, + stream::{self, StreamExt, TryStreamExt}, +}; use url::Url; impl Comment { @@ -57,12 +75,16 @@ impl Comment { comment_form: &CommentInsertForm, parent_path: Option<&Ltree>, ) -> Result { - Comment::create_batch(pool, &[(comment_form, parent_path)]).await?.into_iter().next().ok_or(Error::NotFound) + Comment::create_batch(pool, &[(comment_form.clone(), parent_path.cloned())]) + .await? + .into_iter() + .next() + .ok_or(Error::NotFound) } - + pub async fn create_batch( pool: &mut DbPool<'_>, - items: &[(&CommentInsertForm, Option<&Ltree>)], + items: &[(CommentInsertForm, Option)], ) -> Result, Error> { let conn = &mut get_conn(pool).await?; @@ -72,43 +94,44 @@ impl Comment { Box::pin(async move { let forms = items .iter() - .map(|&(form, _)| form) - .collect::>(); + .map(|(comment_form, parent_path)| CommentInsertForm { + path: Some(parent_path.clone().unwrap_or(Ltree("0".to_owned()))), + ..comment_form.clone() + }); // Insert, to get the ids let inserted_comments = insert_into(comment) - .values(&forms) - /*.on_conflict(ap_id) - .do_update() - .set()*/ - .get_result::(conn) + .values(forms.clone().collect::>()) + .load::(conn) + .or_else(|_| { + // `ap_id` unique constraint violation is handled individually for each row + // because batched upsert requires having the same `set` argument for all rows + stream::iter(forms) + .then(|form| { + insert_into(comment) + .values(form.clone()) + .on_conflict(ap_id) + .do_update() + .set(form) + .get_result::(conn) + }) + .try_collect::>() + }) .await?; - // `ap_id` unique constraint violation is handled individually for each row - // because batch upsert requires having the same `set` argument for all rows - - - let comment_id = inserted_comment.id; - - // You need to update the ltree column - let ltree = Ltree(if let Some(parent_path) = parent_path { - // The previous parent will already have 0 in it - // Append this comment id - format!("{}.{}", parent_path.0, comment_id) - } else { - // '0' is always the first path, append to that - format!("{}.{}", 0, comment_id) - }); - - let updated_comment = diesel::update(comment.find(comment_id)) - .set(path.eq(ltree)) - .get_result::(conn) - .await; + // For each comment, append its id to its path + let updated_comments = diesel::update(comment) + .filter(id.eq_any(inserted_comments.into_iter().map(|c| c.id))) + .set(path.eq(text2ltree( + ltree2text(path).concat(".").concat(AsText::new(id)), + ))) + .load::(conn) + .await?; // Update the child count for the parent comment_aggregates // You could do this with a trigger, but since you have to do this manually anyway, // you can just have it here - if let Some(parent_path) = parent_path { + for parent_path in items.iter().filter_map(|(_, p)| p.as_ref()) { // You have to update counts for all parents, not just the immediate one // TODO if the performance of this is terrible, it might be better to do this as part of a // scheduled query... although the counts would often be wrong. @@ -137,7 +160,7 @@ where ca.comment_id = c.id" sql_query(update_child_count_stmt).execute(conn).await?; } } - updated_comment + Ok(updated_comments) }) as _ }) .await diff --git a/crates/db_schema/src/source/comment.rs b/crates/db_schema/src/source/comment.rs index 3ebea42c1..63ed04f0d 100644 --- a/crates/db_schema/src/source/comment.rs +++ b/crates/db_schema/src/source/comment.rs @@ -67,6 +67,10 @@ pub struct CommentInsertForm { pub deleted: Option, pub ap_id: Option, pub local: Option, + #[cfg(feature = "full")] + pub path: Option, + #[cfg(not(feature = "full"))] + pub path: Option, pub distinguished: Option, pub language_id: Option, } diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 7adf009a4..55667f5da 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -13,13 +13,15 @@ use diesel::{ deserialize::FromSql, helper_types::AsExprOf, pg::Pg, + query_builder::{Query, QueryFragment}, + query_dsl::methods::LimitDsl, result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError}, serialize::{Output, ToSql}, sql_query, sql_types::{Text, Timestamptz}, IntoSql, PgConnection, - RunQueryDsl, Insertable, Table, Column, AsChangeset, Expression, SelectableExpression, expression::NonAggregate, query_builder::QueryFragment, + RunQueryDsl, }; use diesel_async::{ pg::AsyncPgConnection, @@ -153,17 +155,65 @@ macro_rules! try_join_with_pool { }}; } -pub async fn batch_upsert(conn: &mut AsyncPgConnection, target: T, records: U, conflict_target: Target) -> Result, DieselError> -where - T: Table, - T::AllColumns: Expression + SelectableExpression + NonAggregate + QueryFragment, - U: IntoIterator + Clone, - Vec: Insertable, - U::Item: Insertable + AsChangeset, - Target: Column, -{ - let result = diesel::insert_into(target).values(records.clone().into_iter().collect::>()).load::(conn).await; - +/// Includes an SQL comment before `T`, which can be used to label auto_explain output +#[derive(QueryId)] +pub struct Commented { + comment: String, + inner: T, +} + +impl Commented { + pub fn new(inner: T) -> Self { + Commented { + comment: String::new(), + inner, + } + } + + /// Adds `text` to the comment if `condition` is true + pub fn text_if(mut self, text: &str, condition: bool) -> Self { + if condition { + if !self.comment.is_empty() { + self.comment.push_str(", "); + } + self.comment.push_str(text); + } + self + } + + /// Adds `text` to the comment + pub fn text(self, text: &str) -> Self { + self.text_if(text, true) + } +} + +impl Query for Commented { + type SqlType = T::SqlType; +} + +impl> QueryFragment for Commented { + fn walk_ast<'b>( + &'b self, + mut out: diesel::query_builder::AstPass<'_, 'b, Pg>, + ) -> Result<(), DieselError> { + for line in self.comment.lines() { + out.push_sql("\n-- "); + out.push_sql(line); + } + out.push_sql("\n"); + self.inner.walk_ast(out.reborrow()) + } +} + +impl LimitDsl for Commented { + type Output = Commented; + + fn limit(self, limit: i64) -> Self::Output { + Commented { + comment: self.comment, + inner: self.inner.limit(limit), + } + } } pub fn fuzzy_search(q: &str) -> String { @@ -368,7 +418,10 @@ static EMAIL_REGEX: Lazy = Lazy::new(|| { }); pub mod functions { - use diesel::sql_types::{BigInt, Text, Timestamptz}; + use diesel::{ + pg::Pg, + sql_types::{BigInt, Text, Timestamptz}, + }; sql_function! { fn hot_rank(score: BigInt, time: Timestamptz) -> Double; @@ -386,6 +439,9 @@ pub mod functions { // really this function is variadic, this just adds the two-argument version sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); + + // Use `AsText::new` + postfix_operator!(AsText, "::text", Text, backend: Pg); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index 087bc1ec0..f382f3920 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -43,6 +43,7 @@ use lemmy_db_schema::{ get_conn, limit_and_offset, now, + Commented, DbConn, DbPool, ListFn, @@ -282,7 +283,10 @@ fn queries<'a>() -> Queries< ); } - query.first::(&mut conn).await + Commented::new(query) + .text("PostView::read") + .first::(&mut conn) + .await }; let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move { @@ -557,7 +561,14 @@ fn queries<'a>() -> Queries< debug!("Post View Query: {:?}", debug_query::(&query)); - query.load::(&mut conn).await + Commented::new(query) + .text("PostQuery::list") + .text_if( + "getting upper bound for next query", + options.community_id_just_for_prefetch, + ) + .load::(&mut conn) + .await }; Queries::new(read, list) @@ -1457,12 +1468,13 @@ mod tests { .post_id(inserted_post.id) .content("yes".to_owned()) .build(); - let inserted_comment = Comment::create(pool, &comment_form, None).await.unwrap(); - inserted_comment_ids.push(inserted_comment.id); + comment_forms.push((comment_form, None)); } } } + Comment::create_batch(pool, &comment_forms).await.unwrap(); + let mut listed_post_ids = vec![]; let mut page_after = None; loop { diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index d25b15e75..31d3e0d3f 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -17,24 +17,34 @@ then rm -rf $PGDATA fi -# Create cluster -initdb --username=postgres --auth=trust --no-instructions \ +config_args=( # Only listen to socket in current directory - -c listen_addresses= -c unix_socket_directories=$PWD \ + -c listen_addresses= + -c unix_socket_directories=$PWD + # Write logs to a file in $PGDATA/log - -c logging_collector=on \ + -c logging_collector=on + # Log all query plans by default - -c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 + -c session_preload_libraries=auto_explain + -c auto_explain.log_min_duration=0 + # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on + # Avoid sequential scans so query plans show what index scans can be done # (index scan is normally avoided in some cases, such as the table being small enough) -c enable_seqscan=off + # Don't log parameter values -c auto_explain.log_parameter_max_length=0 +) + +# Create cluster +initdb --username=postgres --auth=trust --no-instructions # Start server that only listens to socket in current directory -pg_ctl start +pg_ctl start --options="${config_args[*]}" # Setup database psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres