Better query plan viewing experience (#4285)
* stuff * stuff including batch_upsert function * stuff * do things * stuff * different timestamps * stuff * Revert changes to comment.rs * Update comment.rs * Update comment.rs * Update post_view.rs * Update utils.rs * Update up.sql * Update up.sql * Update down.sql * Update up.sql * Update main.rs * use anyhow macro * replace get(0) with first() * as_slice * Update series.rs * Update db_perf.sh * Update and rename crates/db_schema/src/utils/series.rs to crates/db_perf/src/series.rs * Update utils.rs * Update main.rs * Update main.rs * Update .woodpecker.yml * fmt main.rs * Update .woodpecker.yml * Instance::delete at end * Update main.rs * Update Cargo.toml --------- Co-authored-by: Nutomic <me@nutomic.com>
This commit is contained in:
parent
8670403a67
commit
759f6d8a9a
15 changed files with 745 additions and 36 deletions
|
@ -135,6 +135,18 @@ steps:
|
||||||
- diesel migration redo
|
- diesel migration redo
|
||||||
when: *slow_check_paths
|
when: *slow_check_paths
|
||||||
|
|
||||||
|
check_db_perf_tool:
|
||||||
|
image: *rust_image
|
||||||
|
environment:
|
||||||
|
LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
|
||||||
|
RUST_BACKTRACE: "1"
|
||||||
|
CARGO_HOME: .cargo_home
|
||||||
|
commands:
|
||||||
|
# same as scripts/db_perf.sh but without creating a new database server
|
||||||
|
- export LEMMY_CONFIG_LOCATION=config/config.hjson
|
||||||
|
- cargo run --package lemmy_db_perf -- --posts 10 --read-post-pages 1
|
||||||
|
when: *slow_check_paths
|
||||||
|
|
||||||
cargo_clippy:
|
cargo_clippy:
|
||||||
image: *rust_image
|
image: *rust_image
|
||||||
environment:
|
environment:
|
||||||
|
|
14
Cargo.lock
generated
14
Cargo.lock
generated
|
@ -2653,11 +2653,25 @@ dependencies = [
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lemmy_db_perf"
|
||||||
|
version = "0.19.0"
|
||||||
|
dependencies = [
|
||||||
|
"clap",
|
||||||
|
"diesel",
|
||||||
|
"diesel-async",
|
||||||
|
"lemmy_db_schema",
|
||||||
|
"lemmy_db_views",
|
||||||
|
"lemmy_utils",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lemmy_db_schema"
|
name = "lemmy_db_schema"
|
||||||
version = "0.19.3"
|
version = "0.19.3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bcrypt",
|
"bcrypt",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
|
|
@ -54,6 +54,7 @@ members = [
|
||||||
"crates/api_common",
|
"crates/api_common",
|
||||||
"crates/apub",
|
"crates/apub",
|
||||||
"crates/utils",
|
"crates/utils",
|
||||||
|
"crates/db_perf",
|
||||||
"crates/db_schema",
|
"crates/db_schema",
|
||||||
"crates/db_views",
|
"crates/db_views",
|
||||||
"crates/db_views_actor",
|
"crates/db_views_actor",
|
||||||
|
@ -156,6 +157,7 @@ tokio-postgres = "0.7.10"
|
||||||
tokio-postgres-rustls = "0.10.0"
|
tokio-postgres-rustls = "0.10.0"
|
||||||
enum-map = "2.7"
|
enum-map = "2.7"
|
||||||
moka = { version = "0.12.1", features = ["future"] }
|
moka = { version = "0.12.1", features = ["future"] }
|
||||||
|
clap = { version = "4.4.11", features = ["derive"] }
|
||||||
pretty_assertions = "1.4.0"
|
pretty_assertions = "1.4.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
@ -193,7 +195,7 @@ futures-util = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
prometheus = { version = "0.13.3", features = ["process"] }
|
prometheus = { version = "0.13.3", features = ["process"] }
|
||||||
serial_test = { workspace = true }
|
serial_test = { workspace = true }
|
||||||
clap = { version = "4.4.11", features = ["derive"] }
|
clap = { workspace = true }
|
||||||
actix-web-prom = "0.7.0"
|
actix-web-prom = "0.7.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
23
crates/db_perf/Cargo.toml
Normal file
23
crates/db_perf/Cargo.toml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
[package]
|
||||||
|
name = "lemmy_db_perf"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
description.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
homepage.workspace = true
|
||||||
|
documentation.workspace = true
|
||||||
|
repository.workspace = true
|
||||||
|
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
clap = { workspace = true }
|
||||||
|
diesel = { workspace = true }
|
||||||
|
diesel-async = { workspace = true }
|
||||||
|
lemmy_db_schema = { workspace = true }
|
||||||
|
lemmy_db_views = { workspace = true, features = ["full"] }
|
||||||
|
lemmy_utils = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
179
crates/db_perf/src/main.rs
Normal file
179
crates/db_perf/src/main.rs
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
mod series;
|
||||||
|
|
||||||
|
use crate::series::ValuesFromSeries;
|
||||||
|
use anyhow::Context;
|
||||||
|
use clap::Parser;
|
||||||
|
use diesel::{
|
||||||
|
dsl::{self, sql},
|
||||||
|
sql_types,
|
||||||
|
ExpressionMethods,
|
||||||
|
IntoSql,
|
||||||
|
};
|
||||||
|
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};
|
||||||
|
use lemmy_db_schema::{
|
||||||
|
schema::post,
|
||||||
|
source::{
|
||||||
|
community::{Community, CommunityInsertForm},
|
||||||
|
instance::Instance,
|
||||||
|
person::{Person, PersonInsertForm},
|
||||||
|
},
|
||||||
|
traits::Crud,
|
||||||
|
utils::{build_db_pool, get_conn, now},
|
||||||
|
SortType,
|
||||||
|
};
|
||||||
|
use lemmy_db_views::{post_view::PostQuery, structs::PaginationCursor};
|
||||||
|
use lemmy_utils::error::{LemmyErrorExt2, LemmyResult};
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct CmdArgs {
|
||||||
|
#[arg(long, default_value_t = 3.try_into().unwrap())]
|
||||||
|
communities: NonZeroU32,
|
||||||
|
#[arg(long, default_value_t = 3.try_into().unwrap())]
|
||||||
|
people: NonZeroU32,
|
||||||
|
#[arg(long, default_value_t = 100000.try_into().unwrap())]
|
||||||
|
posts: NonZeroU32,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
read_post_pages: u32,
|
||||||
|
#[arg(long)]
|
||||||
|
explain_insertions: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let mut result = try_main().await.into_anyhow();
|
||||||
|
if let Ok(path) = std::env::var("PGDATA") {
|
||||||
|
result = result.with_context(|| {
|
||||||
|
format!("Failed to run lemmy_db_perf (more details might be available in {path}/log)")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_main() -> LemmyResult<()> {
|
||||||
|
let args = CmdArgs::parse();
|
||||||
|
let pool = &build_db_pool().await?;
|
||||||
|
let pool = &mut pool.into();
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
|
||||||
|
if args.explain_insertions {
|
||||||
|
// log_nested_statements is enabled to log trigger execution
|
||||||
|
conn
|
||||||
|
.batch_execute(
|
||||||
|
"SET auto_explain.log_min_duration = 0; SET auto_explain.log_nested_statements = on;",
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let instance = Instance::read_or_create(&mut conn.into(), "reddit.com".to_owned()).await?;
|
||||||
|
|
||||||
|
println!("🫃 creating {} people", args.people);
|
||||||
|
let mut person_ids = vec![];
|
||||||
|
for i in 0..args.people.get() {
|
||||||
|
let form = PersonInsertForm::builder()
|
||||||
|
.name(format!("p{i}"))
|
||||||
|
.public_key("pubkey".to_owned())
|
||||||
|
.instance_id(instance.id)
|
||||||
|
.build();
|
||||||
|
person_ids.push(Person::create(&mut conn.into(), &form).await?.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("🌍 creating {} communities", args.communities);
|
||||||
|
let mut community_ids = vec![];
|
||||||
|
for i in 0..args.communities.get() {
|
||||||
|
let form = CommunityInsertForm::builder()
|
||||||
|
.name(format!("c{i}"))
|
||||||
|
.title(i.to_string())
|
||||||
|
.instance_id(instance.id)
|
||||||
|
.build();
|
||||||
|
community_ids.push(Community::create(&mut conn.into(), &form).await?.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
let post_batches = args.people.get() * args.communities.get();
|
||||||
|
let posts_per_batch = args.posts.get() / post_batches;
|
||||||
|
let num_posts = post_batches * posts_per_batch;
|
||||||
|
println!(
|
||||||
|
"📜 creating {} posts ({} featured in community)",
|
||||||
|
num_posts, post_batches
|
||||||
|
);
|
||||||
|
let mut num_inserted_posts = 0;
|
||||||
|
// TODO: progress bar
|
||||||
|
for person_id in &person_ids {
|
||||||
|
for community_id in &community_ids {
|
||||||
|
let n = dsl::insert_into(post::table)
|
||||||
|
.values(ValuesFromSeries {
|
||||||
|
start: 1,
|
||||||
|
stop: posts_per_batch.into(),
|
||||||
|
selection: (
|
||||||
|
"AAAAAAAAAAA".into_sql::<sql_types::Text>(),
|
||||||
|
person_id.into_sql::<sql_types::Integer>(),
|
||||||
|
community_id.into_sql::<sql_types::Integer>(),
|
||||||
|
series::current_value.eq(1),
|
||||||
|
now()
|
||||||
|
- sql::<sql_types::Interval>("make_interval(secs => ")
|
||||||
|
.bind::<sql_types::BigInt, _>(series::current_value)
|
||||||
|
.sql(")"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.into_columns((
|
||||||
|
post::name,
|
||||||
|
post::creator_id,
|
||||||
|
post::community_id,
|
||||||
|
post::featured_community,
|
||||||
|
post::published,
|
||||||
|
))
|
||||||
|
.execute(conn)
|
||||||
|
.await?;
|
||||||
|
num_inserted_posts += n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Make sure the println above shows the correct amount
|
||||||
|
assert_eq!(num_inserted_posts, num_posts as usize);
|
||||||
|
|
||||||
|
// Enable auto_explain
|
||||||
|
conn
|
||||||
|
.batch_execute(
|
||||||
|
"SET auto_explain.log_min_duration = 0; SET auto_explain.log_nested_statements = off;",
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// TODO: show execution duration stats
|
||||||
|
let mut page_after = None;
|
||||||
|
for page_num in 1..=args.read_post_pages {
|
||||||
|
println!(
|
||||||
|
"👀 getting page {page_num} of posts (pagination cursor used: {})",
|
||||||
|
page_after.is_some()
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO: include local_user
|
||||||
|
let post_views = PostQuery {
|
||||||
|
community_id: community_ids.as_slice().first().cloned(),
|
||||||
|
sort: Some(SortType::New),
|
||||||
|
limit: Some(20),
|
||||||
|
page_after,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
.list(&mut conn.into())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(post_view) = post_views.into_iter().last() {
|
||||||
|
println!("👀 getting pagination cursor data for next page");
|
||||||
|
let cursor_data = PaginationCursor::after_post(&post_view)
|
||||||
|
.read(&mut conn.into())
|
||||||
|
.await?;
|
||||||
|
page_after = Some(cursor_data);
|
||||||
|
} else {
|
||||||
|
println!("👀 reached empty page");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete everything, which might prevent problems if this is not run using scripts/db_perf.sh
|
||||||
|
Instance::delete(&mut conn.into(), instance.id).await?;
|
||||||
|
|
||||||
|
if let Ok(path) = std::env::var("PGDATA") {
|
||||||
|
println!("🪵 query plans written in {path}/log");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
98
crates/db_perf/src/series.rs
Normal file
98
crates/db_perf/src/series.rs
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
use diesel::{
|
||||||
|
dsl,
|
||||||
|
expression::{is_aggregate, ValidGrouping},
|
||||||
|
pg::Pg,
|
||||||
|
query_builder::{AsQuery, AstPass, QueryFragment},
|
||||||
|
result::Error,
|
||||||
|
sql_types,
|
||||||
|
AppearsOnTable,
|
||||||
|
Expression,
|
||||||
|
Insertable,
|
||||||
|
QueryId,
|
||||||
|
SelectableExpression,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Gererates a series of rows for insertion.
|
||||||
|
///
|
||||||
|
/// An inclusive range is created from `start` and `stop`. A row for each number is generated using `selection`, which can be a tuple.
|
||||||
|
/// [`current_value`] is an expression that gets the current value.
|
||||||
|
///
|
||||||
|
/// For example, if there's a `numbers` table with a `number` column, this inserts all numbers from 1 to 10 in a single statement:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// dsl::insert_into(numbers::table)
|
||||||
|
/// .values(ValuesFromSeries {
|
||||||
|
/// start: 1,
|
||||||
|
/// stop: 10,
|
||||||
|
/// selection: series::current_value,
|
||||||
|
/// })
|
||||||
|
/// .into_columns(numbers::number)
|
||||||
|
/// ```
|
||||||
|
#[derive(QueryId)]
|
||||||
|
pub struct ValuesFromSeries<S> {
|
||||||
|
pub start: i64,
|
||||||
|
pub stop: i64,
|
||||||
|
pub selection: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: QueryFragment<Pg>> QueryFragment<Pg> for ValuesFromSeries<S> {
|
||||||
|
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> {
|
||||||
|
self.selection.walk_ast(out.reborrow())?;
|
||||||
|
out.push_sql(" FROM generate_series(");
|
||||||
|
out.push_bind_param::<sql_types::BigInt, _>(&self.start)?;
|
||||||
|
out.push_sql(", ");
|
||||||
|
out.push_bind_param::<sql_types::BigInt, _>(&self.stop)?;
|
||||||
|
out.push_sql(")");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Expression> Expression for ValuesFromSeries<S> {
|
||||||
|
type SqlType = S::SqlType;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, S: AppearsOnTable<current_value>> AppearsOnTable<T> for ValuesFromSeries<S> {}
|
||||||
|
|
||||||
|
impl<T, S: SelectableExpression<current_value>> SelectableExpression<T> for ValuesFromSeries<S> {}
|
||||||
|
|
||||||
|
impl<T, S: SelectableExpression<current_value>> Insertable<T> for ValuesFromSeries<S>
|
||||||
|
where
|
||||||
|
dsl::BareSelect<Self>: AsQuery + Insertable<T>,
|
||||||
|
{
|
||||||
|
type Values = <dsl::BareSelect<Self> as Insertable<T>>::Values;
|
||||||
|
|
||||||
|
fn values(self) -> Self::Values {
|
||||||
|
dsl::select(self).values()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ValidGrouping<(), IsAggregate = is_aggregate::No>> ValidGrouping<()>
|
||||||
|
for ValuesFromSeries<S>
|
||||||
|
{
|
||||||
|
type IsAggregate = is_aggregate::No;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(non_camel_case_types)]
|
||||||
|
#[derive(QueryId, Clone, Copy, Debug)]
|
||||||
|
pub struct current_value;
|
||||||
|
|
||||||
|
impl QueryFragment<Pg> for current_value {
|
||||||
|
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> {
|
||||||
|
out.push_identifier("generate_series")?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Expression for current_value {
|
||||||
|
type SqlType = sql_types::BigInt;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppearsOnTable<current_value> for current_value {}
|
||||||
|
|
||||||
|
impl SelectableExpression<current_value> for current_value {}
|
||||||
|
|
||||||
|
impl ValidGrouping<()> for current_value {
|
||||||
|
type IsAggregate = is_aggregate::No;
|
||||||
|
}
|
|
@ -76,6 +76,7 @@ tokio-postgres = { workspace = true, optional = true }
|
||||||
tokio-postgres-rustls = { workspace = true, optional = true }
|
tokio-postgres-rustls = { workspace = true, optional = true }
|
||||||
rustls = { workspace = true, optional = true }
|
rustls = { workspace = true, optional = true }
|
||||||
uuid = { workspace = true, features = ["v4"] }
|
uuid = { workspace = true, features = ["v4"] }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = { workspace = true }
|
serial_test = { workspace = true }
|
||||||
|
|
|
@ -96,16 +96,18 @@ impl LocalUserLanguage {
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for l in lang_ids {
|
let forms = lang_ids
|
||||||
let form = LocalUserLanguageForm {
|
.into_iter()
|
||||||
|
.map(|l| LocalUserLanguageForm {
|
||||||
local_user_id: for_local_user_id,
|
local_user_id: for_local_user_id,
|
||||||
language_id: l,
|
language_id: l,
|
||||||
};
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
insert_into(local_user_language)
|
insert_into(local_user_language)
|
||||||
.values(form)
|
.values(forms)
|
||||||
.get_result::<Self>(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}) as _
|
}) as _
|
||||||
})
|
})
|
||||||
|
@ -164,16 +166,18 @@ impl SiteLanguage {
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for l in lang_ids {
|
let forms = lang_ids
|
||||||
let form = SiteLanguageForm {
|
.into_iter()
|
||||||
|
.map(|l| SiteLanguageForm {
|
||||||
site_id: for_site_id,
|
site_id: for_site_id,
|
||||||
language_id: l,
|
language_id: l,
|
||||||
};
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
insert_into(site_language)
|
insert_into(site_language)
|
||||||
.values(form)
|
.values(forms)
|
||||||
.get_result::<Self>(conn)
|
.get_result::<Self>(conn)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
|
||||||
|
|
||||||
CommunityLanguage::limit_languages(conn, instance_id).await?;
|
CommunityLanguage::limit_languages(conn, instance_id).await?;
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
||||||
SortType,
|
SortType,
|
||||||
};
|
};
|
||||||
use activitypub_federation::{fetch::object_id::ObjectId, traits::Object};
|
use activitypub_federation::{fetch::object_id::ObjectId, traits::Object};
|
||||||
|
use anyhow::Context;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use deadpool::Runtime;
|
use deadpool::Runtime;
|
||||||
use diesel::{
|
use diesel::{
|
||||||
|
@ -13,6 +14,8 @@ use diesel::{
|
||||||
deserialize::FromSql,
|
deserialize::FromSql,
|
||||||
helper_types::AsExprOf,
|
helper_types::AsExprOf,
|
||||||
pg::Pg,
|
pg::Pg,
|
||||||
|
query_builder::{Query, QueryFragment},
|
||||||
|
query_dsl::methods::LimitDsl,
|
||||||
result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError},
|
result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError},
|
||||||
serialize::{Output, ToSql},
|
serialize::{Output, ToSql},
|
||||||
sql_types::{Text, Timestamptz},
|
sql_types::{Text, Timestamptz},
|
||||||
|
@ -150,6 +153,67 @@ macro_rules! try_join_with_pool {
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Includes an SQL comment before `T`, which can be used to label auto_explain output
|
||||||
|
#[derive(QueryId)]
|
||||||
|
pub struct Commented<T> {
|
||||||
|
comment: String,
|
||||||
|
inner: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Commented<T> {
|
||||||
|
pub fn new(inner: T) -> Self {
|
||||||
|
Commented {
|
||||||
|
comment: String::new(),
|
||||||
|
inner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds `text` to the comment if `condition` is true
|
||||||
|
pub fn text_if(mut self, text: &str, condition: bool) -> Self {
|
||||||
|
if condition {
|
||||||
|
if !self.comment.is_empty() {
|
||||||
|
self.comment.push_str(", ");
|
||||||
|
}
|
||||||
|
self.comment.push_str(text);
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds `text` to the comment
|
||||||
|
pub fn text(self, text: &str) -> Self {
|
||||||
|
self.text_if(text, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Query> Query for Commented<T> {
|
||||||
|
type SqlType = T::SqlType;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: QueryFragment<Pg>> QueryFragment<Pg> for Commented<T> {
|
||||||
|
fn walk_ast<'b>(
|
||||||
|
&'b self,
|
||||||
|
mut out: diesel::query_builder::AstPass<'_, 'b, Pg>,
|
||||||
|
) -> Result<(), DieselError> {
|
||||||
|
for line in self.comment.lines() {
|
||||||
|
out.push_sql("\n-- ");
|
||||||
|
out.push_sql(line);
|
||||||
|
}
|
||||||
|
out.push_sql("\n");
|
||||||
|
self.inner.walk_ast(out.reborrow())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: LimitDsl> LimitDsl for Commented<T> {
|
||||||
|
type Output = Commented<T::Output>;
|
||||||
|
|
||||||
|
fn limit(self, limit: i64) -> Self::Output {
|
||||||
|
Commented {
|
||||||
|
comment: self.comment,
|
||||||
|
inner: self.inner.limit(limit),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn fuzzy_search(q: &str) -> String {
|
pub fn fuzzy_search(q: &str) -> String {
|
||||||
let replaced = q.replace('%', "\\%").replace('_', "\\_").replace(' ', "%");
|
let replaced = q.replace('%', "\\%").replace('_', "\\_").replace(' ', "%");
|
||||||
format!("%{replaced}%")
|
format!("%{replaced}%")
|
||||||
|
@ -275,15 +339,18 @@ impl ServerCertVerifier for NoCertVerifier {
|
||||||
|
|
||||||
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
|
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
|
||||||
|
|
||||||
fn run_migrations(db_url: &str) {
|
fn run_migrations(db_url: &str) -> Result<(), LemmyError> {
|
||||||
// Needs to be a sync connection
|
// Needs to be a sync connection
|
||||||
let mut conn =
|
let mut conn =
|
||||||
PgConnection::establish(db_url).unwrap_or_else(|e| panic!("Error connecting to {db_url}: {e}"));
|
PgConnection::establish(db_url).with_context(|| format!("Error connecting to {db_url}"))?;
|
||||||
|
|
||||||
info!("Running Database migrations (This may take a long time)...");
|
info!("Running Database migrations (This may take a long time)...");
|
||||||
let _ = &mut conn
|
conn
|
||||||
.run_pending_migrations(MIGRATIONS)
|
.run_pending_migrations(MIGRATIONS)
|
||||||
.unwrap_or_else(|e| panic!("Couldn't run DB Migrations: {e}"));
|
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?;
|
||||||
info!("Database migrations complete.");
|
info!("Database migrations complete.");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn build_db_pool() -> Result<ActualDbPool, LemmyError> {
|
pub async fn build_db_pool() -> Result<ActualDbPool, LemmyError> {
|
||||||
|
@ -304,7 +371,7 @@ pub async fn build_db_pool() -> Result<ActualDbPool, LemmyError> {
|
||||||
.runtime(Runtime::Tokio1)
|
.runtime(Runtime::Tokio1)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
run_migrations(&db_url);
|
run_migrations(&db_url)?;
|
||||||
|
|
||||||
Ok(pool)
|
Ok(pool)
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ use lemmy_db_schema::{
|
||||||
get_conn,
|
get_conn,
|
||||||
limit_and_offset,
|
limit_and_offset,
|
||||||
now,
|
now,
|
||||||
|
Commented,
|
||||||
DbConn,
|
DbConn,
|
||||||
DbPool,
|
DbPool,
|
||||||
ListFn,
|
ListFn,
|
||||||
|
@ -282,7 +283,10 @@ fn queries<'a>() -> Queries<
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
query.first::<PostView>(&mut conn).await
|
Commented::new(query)
|
||||||
|
.text("PostView::read")
|
||||||
|
.first::<PostView>(&mut conn)
|
||||||
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move {
|
let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move {
|
||||||
|
@ -564,7 +568,14 @@ fn queries<'a>() -> Queries<
|
||||||
|
|
||||||
debug!("Post View Query: {:?}", debug_query::<Pg, _>(&query));
|
debug!("Post View Query: {:?}", debug_query::<Pg, _>(&query));
|
||||||
|
|
||||||
query.load::<PostView>(&mut conn).await
|
Commented::new(query)
|
||||||
|
.text("PostQuery::list")
|
||||||
|
.text_if(
|
||||||
|
"getting upper bound for next query",
|
||||||
|
options.community_id_just_for_prefetch,
|
||||||
|
)
|
||||||
|
.load::<PostView>(&mut conn)
|
||||||
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
Queries::new(read, list)
|
Queries::new(read, list)
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
CREATE OR REPLACE FUNCTION post_aggregates_post ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
IF (TG_OP = 'INSERT') THEN
|
||||||
|
INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id)
|
||||||
|
SELECT
|
||||||
|
NEW.id,
|
||||||
|
NEW.published,
|
||||||
|
NEW.published,
|
||||||
|
NEW.published,
|
||||||
|
NEW.community_id,
|
||||||
|
NEW.creator_id,
|
||||||
|
community.instance_id
|
||||||
|
FROM
|
||||||
|
community
|
||||||
|
WHERE
|
||||||
|
NEW.community_id = community.id;
|
||||||
|
ELSIF (TG_OP = 'DELETE') THEN
|
||||||
|
DELETE FROM post_aggregates
|
||||||
|
WHERE post_id = OLD.id;
|
||||||
|
END IF;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER post_aggregates_post
|
||||||
|
AFTER INSERT OR DELETE ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE post_aggregates_post ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER community_aggregates_post_count
|
||||||
|
AFTER INSERT OR DELETE OR UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE community_aggregates_post_count ();
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS community_aggregates_post_count_insert CASCADE;
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS community_aggregates_post_update CASCADE;
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS site_aggregates_post_update CASCADE;
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS person_aggregates_post_insert CASCADE;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION site_aggregates_post_insert ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN
|
||||||
|
UPDATE
|
||||||
|
site_aggregates sa
|
||||||
|
SET
|
||||||
|
posts = posts + 1
|
||||||
|
FROM
|
||||||
|
site s
|
||||||
|
WHERE
|
||||||
|
sa.site_id = s.id;
|
||||||
|
END IF;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER site_aggregates_post_insert
|
||||||
|
AFTER INSERT OR UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
WHEN (NEW.local = TRUE)
|
||||||
|
EXECUTE PROCEDURE site_aggregates_post_insert ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION generate_unique_changeme ()
|
||||||
|
RETURNS text
|
||||||
|
LANGUAGE sql
|
||||||
|
AS $$
|
||||||
|
SELECT
|
||||||
|
'http://changeme.invalid/' || substr(md5(random()::text), 0, 25);
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER person_aggregates_post_count
|
||||||
|
AFTER INSERT OR DELETE OR UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE person_aggregates_post_count ();
|
||||||
|
|
||||||
|
DROP SEQUENCE IF EXISTS changeme_seq;
|
||||||
|
|
166
migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql
Normal file
166
migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
-- Change triggers to run once per statement instead of once per row
|
||||||
|
-- post_aggregates_post trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE
|
||||||
|
CREATE OR REPLACE FUNCTION post_aggregates_post ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id)
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
published,
|
||||||
|
published,
|
||||||
|
published,
|
||||||
|
community_id,
|
||||||
|
creator_id,
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
community.instance_id
|
||||||
|
FROM
|
||||||
|
community
|
||||||
|
WHERE
|
||||||
|
community.id = community_id
|
||||||
|
LIMIT 1)
|
||||||
|
FROM
|
||||||
|
new_post;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION community_aggregates_post_count_insert ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
UPDATE
|
||||||
|
community_aggregates
|
||||||
|
SET
|
||||||
|
posts = posts + post_group.count
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
community_id,
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
new_post
|
||||||
|
GROUP BY
|
||||||
|
community_id) post_group
|
||||||
|
WHERE
|
||||||
|
community_aggregates.community_id = post_group.community_id;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION person_aggregates_post_insert ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
UPDATE
|
||||||
|
person_aggregates
|
||||||
|
SET
|
||||||
|
post_count = post_count + post_group.count
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
creator_id,
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
new_post
|
||||||
|
GROUP BY
|
||||||
|
creator_id) post_group
|
||||||
|
WHERE
|
||||||
|
person_aggregates.person_id = post_group.creator_id;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER post_aggregates_post
|
||||||
|
AFTER INSERT ON post REFERENCING NEW TABLE AS new_post
|
||||||
|
FOR EACH STATEMENT
|
||||||
|
EXECUTE PROCEDURE post_aggregates_post ();
|
||||||
|
|
||||||
|
-- Don't run old trigger for insert
|
||||||
|
CREATE OR REPLACE TRIGGER community_aggregates_post_count
|
||||||
|
AFTER DELETE OR UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE community_aggregates_post_count ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER community_aggregates_post_count_insert
|
||||||
|
AFTER INSERT ON post REFERENCING NEW TABLE AS new_post
|
||||||
|
FOR EACH STATEMENT
|
||||||
|
EXECUTE PROCEDURE community_aggregates_post_count_insert ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION site_aggregates_post_update ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN
|
||||||
|
UPDATE
|
||||||
|
site_aggregates sa
|
||||||
|
SET
|
||||||
|
posts = posts + 1
|
||||||
|
FROM
|
||||||
|
site s
|
||||||
|
WHERE
|
||||||
|
sa.site_id = s.id;
|
||||||
|
END IF;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION site_aggregates_post_insert ()
|
||||||
|
RETURNS TRIGGER
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
UPDATE
|
||||||
|
site_aggregates sa
|
||||||
|
SET
|
||||||
|
posts = posts + (
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
new_post)
|
||||||
|
FROM
|
||||||
|
site s
|
||||||
|
WHERE
|
||||||
|
sa.site_id = s.id;
|
||||||
|
RETURN NULL;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER site_aggregates_post_update
|
||||||
|
AFTER UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
WHEN (NEW.local = TRUE)
|
||||||
|
EXECUTE PROCEDURE site_aggregates_post_update ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER site_aggregates_post_insert
|
||||||
|
AFTER INSERT ON post REFERENCING NEW TABLE AS new_post
|
||||||
|
FOR EACH STATEMENT
|
||||||
|
EXECUTE PROCEDURE site_aggregates_post_insert ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER person_aggregates_post_count
|
||||||
|
AFTER DELETE OR UPDATE OF removed,
|
||||||
|
deleted ON post
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE person_aggregates_post_count ();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER person_aggregates_post_insert
|
||||||
|
AFTER INSERT ON post REFERENCING NEW TABLE AS new_post
|
||||||
|
FOR EACH STATEMENT
|
||||||
|
EXECUTE PROCEDURE person_aggregates_post_insert ();
|
||||||
|
|
||||||
|
-- Avoid running hash function and random number generation for default ap_id
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION generate_unique_changeme ()
|
||||||
|
RETURNS text
|
||||||
|
LANGUAGE sql
|
||||||
|
AS $$
|
||||||
|
SELECT
|
||||||
|
'http://changeme.invalid/seq/' || nextval('changeme_seq')::text;
|
||||||
|
$$;
|
||||||
|
|
20
scripts/db_perf.sh
Executable file
20
scripts/db_perf.sh
Executable file
|
@ -0,0 +1,20 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# This script runs crates/lemmy_db_perf/src/main.rs, which lets you see info related to database query performance, such as query plans.
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)"
|
||||||
|
|
||||||
|
cd $CWD/../
|
||||||
|
|
||||||
|
source scripts/start_dev_db.sh
|
||||||
|
|
||||||
|
export LEMMY_CONFIG_LOCATION=config/config.hjson
|
||||||
|
export RUST_BACKTRACE=1
|
||||||
|
|
||||||
|
cargo run --package lemmy_db_perf -- "$@"
|
||||||
|
|
||||||
|
pg_ctl stop --silent
|
||||||
|
|
||||||
|
# $PGDATA directory is kept so log can be seen
|
|
@ -2,23 +2,47 @@
|
||||||
|
|
||||||
export PGDATA="$PWD/dev_pgdata"
|
export PGDATA="$PWD/dev_pgdata"
|
||||||
export PGHOST=$PWD
|
export PGHOST=$PWD
|
||||||
export LEMMY_DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD"
|
export DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD"
|
||||||
|
export LEMMY_DATABASE_URL=$DATABASE_URL
|
||||||
|
|
||||||
# If cluster exists, stop the server and delete the cluster
|
# If cluster exists, stop the server and delete the cluster
|
||||||
if [ -d $PGDATA ]
|
if [[ -d $PGDATA ]]
|
||||||
then
|
then
|
||||||
# Prevent `stop` from failing if server already stopped
|
# Only stop server if it is running
|
||||||
pg_ctl restart > /dev/null
|
pg_status_exit_code=0
|
||||||
pg_ctl stop
|
(pg_ctl status > /dev/null) || pg_status_exit_code=$?
|
||||||
|
if [[ ${pg_status_exit_code} -ne 3 ]]
|
||||||
|
then
|
||||||
|
pg_ctl stop --silent
|
||||||
|
fi
|
||||||
|
|
||||||
rm -rf $PGDATA
|
rm -rf $PGDATA
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Create cluster
|
config_args=(
|
||||||
initdb --username=postgres --auth=trust --no-instructions
|
# Only listen to socket in current directory
|
||||||
|
-c listen_addresses=
|
||||||
|
-c unix_socket_directories=$PWD
|
||||||
|
|
||||||
# Start server that only listens to socket in current directory
|
# Write logs to a file in $PGDATA/log
|
||||||
pg_ctl start --options="-c listen_addresses= -c unix_socket_directories=$PWD" > /dev/null
|
-c logging_collector=on
|
||||||
|
|
||||||
|
# Allow auto_explain to be turned on
|
||||||
|
-c session_preload_libraries=auto_explain
|
||||||
|
|
||||||
|
# Include actual row amounts and run times for query plan nodes
|
||||||
|
-c auto_explain.log_analyze=on
|
||||||
|
|
||||||
|
# Don't log parameter values
|
||||||
|
-c auto_explain.log_parameter_max_length=0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create cluster
|
||||||
|
pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructions"
|
||||||
|
|
||||||
|
# Start server
|
||||||
|
pg_ctl start --silent --options="${config_args[*]}"
|
||||||
|
|
||||||
# Setup database
|
# Setup database
|
||||||
psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres
|
psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres
|
||||||
psql -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres
|
psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres
|
||||||
|
|
|
@ -27,5 +27,5 @@ cargo test -p lemmy_utils --all-features --no-fail-fast
|
||||||
|
|
||||||
# Add this to do printlns: -- --nocapture
|
# Add this to do printlns: -- --nocapture
|
||||||
|
|
||||||
pg_ctl stop
|
pg_ctl stop --silent
|
||||||
rm -rf $PGDATA
|
rm -rf $PGDATA
|
||||||
|
|
Loading…
Reference in a new issue