do things

This commit is contained in:
dull b 2023-12-20 06:28:27 +00:00
parent 9b9314a1d6
commit 869a2466b7
12 changed files with 394 additions and 14 deletions

13
Cargo.lock generated
View file

@ -2644,6 +2644,19 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "lemmy_db_perf"
version = "0.19.0"
dependencies = [
"clap",
"diesel",
"diesel-async",
"lemmy_db_schema",
"lemmy_db_views",
"lemmy_utils",
"tokio",
]
[[package]] [[package]]
name = "lemmy_db_schema" name = "lemmy_db_schema"
version = "0.19.0" version = "0.19.0"

View file

@ -53,6 +53,7 @@ members = [
"crates/api_common", "crates/api_common",
"crates/apub", "crates/apub",
"crates/utils", "crates/utils",
"crates/db_perf",
"crates/db_schema", "crates/db_schema",
"crates/db_views", "crates/db_views",
"crates/db_views_actor", "crates/db_views_actor",
@ -155,6 +156,7 @@ tokio-postgres = "0.7.10"
tokio-postgres-rustls = "0.10.0" tokio-postgres-rustls = "0.10.0"
enum-map = "2.7" enum-map = "2.7"
moka = { version = "0.12.1", features = ["future"] } moka = { version = "0.12.1", features = ["future"] }
clap = { version = "4.4.11", features = ["derive"] }
[dependencies] [dependencies]
lemmy_api = { workspace = true } lemmy_api = { workspace = true }
@ -191,5 +193,5 @@ futures-util = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"] } prometheus = { version = "0.13.3", features = ["process"] }
serial_test = { workspace = true } serial_test = { workspace = true }
clap = { version = "4.4.11", features = ["derive"] } clap = { workspace = true }
actix-web-prom = "0.7.0" actix-web-prom = "0.7.0"

22
crates/db_perf/Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "lemmy_db_perf"
version.workspace = true
edition.workspace = true
description.workspace = true
license.workspace = true
homepage.workspace = true
documentation.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
clap = { workspace = true }
diesel = { workspace = true }
diesel-async = { workspace = true }
lemmy_db_schema = { workspace = true }
lemmy_db_views = { workspace = true, features = ["full"] }
lemmy_utils = { workspace = true }
tokio = { workspace = true }

156
crates/db_perf/src/main.rs Normal file
View file

@ -0,0 +1,156 @@
use clap::{Parser, Subcommand};
use diesel::{dsl, sql_query, sql_types, ExpressionMethods, IntoSql};
use diesel_async::RunQueryDsl;
use lemmy_db_schema::{
schema::post,
source::{
community::{Community, CommunityInsertForm},
instance::Instance,
person::{Person, PersonInsertForm},
},
traits::Crud,
utils::{
build_db_pool,
get_conn,
series::{self, ValuesFromSeries},
},
SortType,
};
use lemmy_db_views::{
post_view::{PaginationCursorData, PostQuery},
structs::PaginationCursor,
};
use lemmy_utils::error::LemmyResult;
use std::num::NonZeroU32;
#[derive(Parser, Debug)]
struct CmdArgs {
#[arg(long, default_value_t = 3.try_into().unwrap())]
communities: NonZeroU32,
#[arg(long, default_value_t = 3.try_into().unwrap())]
people: NonZeroU32,
#[arg(long, default_value_t = 100000.try_into().unwrap())]
posts: NonZeroU32,
#[arg(long)]
read_posts: bool,
}
#[tokio::main]
async fn main() -> LemmyResult<()> {
let args = CmdArgs::parse();
let pool = &build_db_pool().await?;
let pool = &mut pool.into();
let instance = Instance::read_or_create(pool, "reddit.com".to_owned()).await?;
println!("🫃 creating {} people", args.people);
let mut person_ids = vec![];
for i in 0..args.people.get() {
person_ids.push(
Person::create(
pool,
&PersonInsertForm::builder()
.name(format!("p{i}"))
.public_key("pubkey".to_owned())
.instance_id(instance.id)
.build(),
)
.await?
.id,
);
}
println!("🏠 creating {} communities", args.communities);
let mut community_ids = vec![];
for i in 0..args.communities.get() {
community_ids.push(
Community::create(
pool,
&CommunityInsertForm::builder()
.name(format!("c{i}"))
.title(i.to_string())
.instance_id(instance.id)
.build(),
)
.await?
.id,
);
}
let post_batches = args.people.get() * args.communities.get();
let posts_per_batch = args.posts.get() / post_batches;
let num_posts = post_batches * posts_per_batch;
println!(
"📢 creating {} posts ({} featured in community)",
num_posts, post_batches
);
let mut num_inserted_posts = 0;
// TODO: progress bar
for person_id in &person_ids {
for community_id in &community_ids {
let n = dsl::insert_into(post::table)
.values(ValuesFromSeries {
start: 1,
stop: posts_per_batch.into(),
selection: (
"AAAAAAAAAAA".into_sql::<sql_types::Text>(),
person_id.into_sql::<sql_types::Integer>(),
community_id.into_sql::<sql_types::Integer>(),
series::current_value.eq(1),
),
})
.into_columns((
post::name,
post::creator_id,
post::community_id,
post::featured_community,
))
.execute(&mut get_conn(pool).await?)
.await?;
num_inserted_posts += n;
}
}
// Lie detector for the println above
assert_eq!(num_inserted_posts, num_posts as usize);
// Enable auto_explain
let conn = &mut get_conn(pool).await?;
sql_query("SET auto_explain.log_min_duration = 0")
.execute(conn)
.await?;
let pool = &mut conn.into();
if args.read_posts {
let mut page_after = None;
for page_num in 1..=2 {
println!(
"👀 getting page {page_num} of posts (pagination cursor used: {})",
page_after.is_some()
);
// TODO: include local_user
let post_views = PostQuery {
community_id: community_ids.get(0).cloned(),
sort: Some(SortType::New),
limit: Some(20),
page_after,
..Default::default()
}
.list(pool)
.await?;
if let Some(post_view) = post_views.into_iter().last() {
println!("👀 getting pagination cursor data for next page ");
let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?;
page_after = Some(cursor_data);
} else {
break;
}
}
}
if let Ok(path) = std::env::var("PGDATA") {
println!("🪵 query plans written in {path}/log");
}
Ok(())
}

View file

@ -41,7 +41,14 @@ use crate::{
}; };
use ::url::Url; use ::url::Url;
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl, TextExpressionMethods}; use diesel::{
dsl::insert_into,
result::Error,
ExpressionMethods,
Insertable,
QueryDsl,
TextExpressionMethods,
};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use std::collections::HashSet; use std::collections::HashSet;

View file

@ -1,3 +1,5 @@
pub mod series;
use crate::{ use crate::{
diesel::Connection, diesel::Connection,
diesel_migrations::MigrationHarness, diesel_migrations::MigrationHarness,

View file

@ -0,0 +1,81 @@
use diesel::{
dsl,
expression::{is_aggregate, ValidGrouping},
pg::Pg,
query_builder::{AsQuery, AstPass, QueryFragment},
result::Error,
sql_types,
AppearsOnTable,
Expression,
Insertable,
SelectableExpression,
};
#[derive(QueryId)]
pub struct ValuesFromSeries<S> {
pub start: i64,
pub stop: i64,
pub selection: S,
}
impl<S: QueryFragment<Pg>> QueryFragment<Pg> for ValuesFromSeries<S> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> {
self.selection.walk_ast(out.reborrow())?;
out.push_sql(" FROM generate_series(");
out.push_bind_param::<sql_types::BigInt, _>(&self.start)?;
out.push_sql(", ");
out.push_bind_param::<sql_types::BigInt, _>(&self.stop)?;
out.push_sql(")");
Ok(())
}
}
impl<S: Expression> Expression for ValuesFromSeries<S> {
type SqlType = S::SqlType;
}
impl<T, S: AppearsOnTable<current_value>> AppearsOnTable<T> for ValuesFromSeries<S> {}
impl<T, S: SelectableExpression<current_value>> SelectableExpression<T> for ValuesFromSeries<S> {}
impl<T, S: SelectableExpression<current_value>> Insertable<T> for ValuesFromSeries<S>
where
dsl::BareSelect<Self>: AsQuery + Insertable<T>,
{
type Values = <dsl::BareSelect<Self> as Insertable<T>>::Values;
fn values(self) -> Self::Values {
dsl::select(self).values()
}
}
impl<S: ValidGrouping<(), IsAggregate = is_aggregate::No>> ValidGrouping<()>
for ValuesFromSeries<S>
{
type IsAggregate = is_aggregate::No;
}
#[allow(non_camel_case_types)]
#[derive(QueryId, Clone, Copy, Debug)]
pub struct current_value;
impl QueryFragment<Pg> for current_value {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> {
out.push_identifier("generate_series")?;
Ok(())
}
}
impl Expression for current_value {
type SqlType = sql_types::BigInt;
}
impl AppearsOnTable<current_value> for current_value {}
impl SelectableExpression<current_value> for current_value {}
impl ValidGrouping<()> for current_value {
type IsAggregate = is_aggregate::No;
}

View file

@ -0,0 +1,42 @@
CREATE OR REPLACE FUNCTION post_aggregates_post ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id)
SELECT
NEW.id,
NEW.published,
NEW.published,
NEW.published,
NEW.community_id,
NEW.creator_id,
community.instance_id
FROM
community
WHERE
NEW.community_id = community.id;
ELSIF (TG_OP = 'DELETE') THEN
DELETE FROM post_aggregates
WHERE post_id = OLD.id;
END IF;
RETURN NULL;
END
$$;
CREATE OR REPLACE TRIGGER post_aggregates_post
AFTER INSERT OR DELETE ON post
FOR EACH ROW
EXECUTE PROCEDURE post_aggregates_post ();
CREATE OR REPLACE FUNCTION generate_unique_changeme ()
RETURNS text
LANGUAGE sql
AS $$
SELECT
'http://changeme.invalid/' || substr(md5(random()::text), 0, 25);
$$;
DROP SEQUENCE IF EXISTS changeme_seq;

View file

@ -0,0 +1,41 @@
-- Change post_aggregates trigger to run once per statement instead of once per row.
-- The trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE.
CREATE OR REPLACE FUNCTION post_aggregates_post ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id)
SELECT
new_post.id,
new_post.published,
new_post.published,
new_post.published,
new_post.community_id,
new_post.creator_id,
(SELECT community.instance_id FROM community WHERE community.id = new_post.community_id LIMIT 1)
FROM
new_post;
RETURN NULL;
END
$$;
CREATE OR REPLACE TRIGGER post_aggregates_post
AFTER INSERT ON post
REFERENCING NEW TABLE AS new_post
FOR EACH STATEMENT
EXECUTE PROCEDURE post_aggregates_post ();
-- Avoid running hash function and random number generation for default ap_id
CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE;
CREATE OR REPLACE FUNCTION generate_unique_changeme ()
RETURNS text
LANGUAGE sql
AS $$
SELECT
'http://changeme.invalid/seq/' || nextval('changeme_seq')::text;
$$;

17
scripts/db_perf.sh Executable file
View file

@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -e
CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)"
cd $CWD/../
source scripts/start_dev_db.sh
export LEMMY_CONFIG_LOCATION=config/config.hjson
export RUST_BACKTRACE=1
cargo run --package lemmy_db_perf -- "$@"
pg_ctl stop --silent
# $PGDATA directory is kept so log can be seen

View file

@ -11,7 +11,7 @@ then
(pg_ctl status > /dev/null) || pg_status_exit_code=$? (pg_ctl status > /dev/null) || pg_status_exit_code=$?
if [[ ${pg_status_exit_code} -ne 3 ]] if [[ ${pg_status_exit_code} -ne 3 ]]
then then
pg_ctl stop pg_ctl stop --silent
fi fi
rm -rf $PGDATA rm -rf $PGDATA
@ -25,27 +25,23 @@ config_args=(
# Write logs to a file in $PGDATA/log # Write logs to a file in $PGDATA/log
-c logging_collector=on -c logging_collector=on
# Log all query plans by default # Allow auto_explain to be turned on
-c session_preload_libraries=auto_explain -c session_preload_libraries=auto_explain
-c auto_explain.log_min_duration=0 #-c auto_explain.log_min_duration=0
# Include actual row amounts and run times for query plan nodes # Include actual row amounts and run times for query plan nodes
-c auto_explain.log_analyze=on -c auto_explain.log_analyze=on
# Avoid sequential scans so query plans show what index scans can be done
# (index scan is normally avoided in some cases, such as the table being small enough)
-c enable_seqscan=off
# Don't log parameter values # Don't log parameter values
-c auto_explain.log_parameter_max_length=0 -c auto_explain.log_parameter_max_length=0
) )
# Create cluster # Create cluster
initdb --username=postgres --auth=trust --no-instructions pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructions"
# Start server that only listens to socket in current directory # Start server that only listens to socket in current directory
pg_ctl start --options="${config_args[*]}" pg_ctl start --silent --options="${config_args[*]}"
# Setup database # Setup database
psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres
psql -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres

View file

@ -27,4 +27,5 @@ cargo test -p lemmy_utils --all-features --no-fail-fast
# Add this to do printlns: -- --nocapture # Add this to do printlns: -- --nocapture
pg_ctl stop pg_ctl stop --silent
rm -rf $PGDATA