Move SQL triggers from migrations into reusable sql file (#4333)

* stuff

* stuff including batch_upsert function

* stuff

* do things

* stuff

* different timestamps

* stuff

* Revert changes to comment.rs

* Update comment.rs

* Update comment.rs

* Update post_view.rs

* Update utils.rs

* Update up.sql

* Update up.sql

* Update down.sql

* Update up.sql

* Update main.rs

* use anyhow macro

* Create down.sql

* Create up.sql

* Create replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update utils.rs

* Update .woodpecker.yml

* Update sql_format_check.sh

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Create dump_schema.sh

* Update start_dev_db.sh

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* stuff

* Update replaceable_schema.sql

* Update .pg_format

* fmt

* stuff

* stuff (#21)

* Update replaceable_schema.sql

* Update up.sql

* Update replaceable_schema.sql

* fmt

* update cargo.lock

* stuff

* Update replaceable_schema.sql

* Remove truncate trigger because truncate is already restricted by foreign keys

* Update replaceable_schema.sql

* fix some things

* Update replaceable_schema.sql

* Update replaceable_schema.sql

* Update .woodpecker.yml

* stuff

* fix TG_OP

* Psql env vars

* try to fix combine_transition_tables parse error

* Revert "try to fix combine_transition_tables parse error"

This reverts commit 75d00a46266fbb49b7fab1b149c79fa1c31ee84a.

* refactor combine_transition_tables

* try to fix create_triggers

* fix some things

* try to fix combined_transition_tables

* fix sql errors

* update comment count in post trigger

* fmt

* Revert "fmt"

This reverts commit a5bcd0834bb91a63b66bf63a848caa078f193940.

* Revert "update comment count in post trigger"

This reverts commit 0066a4b42b3472c088eed945605a2cf0bfcc1362.

* fix everything

* Update replaceable_schema.sql

* actually fix everything

* refactor create_triggers

* fix

* add semicolons

* add is_counted function and fix incorrect bool operator in update_comment_count_from_post

* refactor comment trigger

* refactor post trigger

* fix

* Delete crates/db_schema/src/utils/series.rs

* subscribers_local

* edit migrations

* move migrations

* remove utils::series module declaration

* fix everything

* stuff

* Move sql to schema_setup dir

* utils.sql

* delete .pg_format

* Update .woodpecker.yml

* Update sql_format_check.sh

* Update .woodpecker.yml

* Merge remote-tracking branch 'upstream/main' into bliss

* fmt

* Create main.rs

* Update lib.rs

* Update main.rs

* Update .woodpecker.yml

* Update main.rs

* Update Cargo.toml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update triggers.sql

* YAY

* Update mod.rs

* Update Cargo.toml

* a

* Update Cargo.toml

* Update Cargo.toml

* Delete crates/db_schema/src/main.rs

* Update Cargo.toml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update utils.sql

* Update utils.sql

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update down.sql

* Update up.sql

* Update triggers.sql

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update triggers.sql

* Update down.sql

* Update .woodpecker.yml

* Update Cargo.toml

* Update .woodpecker.yml

* Update Cargo.toml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update .woodpecker.yml

* Update mod.rs

* Update Cargo.toml

* Update mod.rs

* make dump_schema.sh executable

* fix dump_schema.sh

* defer

* diff dumps

* fmt

* Update utils.sql

* Update .woodpecker.yml

* use correct version for pg_dump

* Update .woodpecker.yml

* Update .woodpecker.yml

* change migration date

* atomic site_aggregates insert

* temporarily repeat tests in CI

* drop r schema in CI migration check

* show ReceivedActivity::create error

* move check_diesel_migration CI step

* Update .woodpecker.yml

* Update scheduled_tasks.rs

* Update scheduled_tasks.rs

* update cargo.lock

* move sql files

* move rank functions

* filter post_aggregates update

* fmt

* cargo fmt

* replace post_id with id

* update cargo.lock

* avoid locking rows that need no change in up.sql

* only run replaceable_schema if migrations were run

* debug ci test failure

* make replaceable_schema work in CI

* Update .woodpecker.yml

* remove println

* Use migration revert and git checkout

* Update schema_setup.rs

* Fix

* Update schema_setup.rs

* Update schema_setup.rs

* Update .woodpecker.yml

---------

Co-authored-by: Nutomic <me@nutomic.com>
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
This commit is contained in:
dullbananas 2024-04-17 17:58:44 -07:00 committed by GitHub
parent 31829b6c05
commit 4ba6221e04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1919 additions and 61 deletions

View file

@ -143,16 +143,6 @@ steps:
- diff tmp.schema crates/db_schema/src/schema.rs - diff tmp.schema crates/db_schema/src/schema.rs
when: *slow_check_paths when: *slow_check_paths
check_diesel_migration_revertable:
image: willsquire/diesel-cli
environment:
CARGO_HOME: .cargo_home
DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
commands:
- diesel migration run
- diesel migration redo
when: *slow_check_paths
check_db_perf_tool: check_db_perf_tool:
image: *rust_image image: *rust_image
environment: environment:
@ -194,6 +184,44 @@ steps:
- cargo test --workspace --no-fail-fast - cargo test --workspace --no-fail-fast
when: *slow_check_paths when: *slow_check_paths
check_diesel_migration:
# TODO: use willsquire/diesel-cli image when shared libraries become optional in lemmy_server
image: *rust_image
environment:
LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
RUST_BACKTRACE: "1"
CARGO_HOME: .cargo_home
DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
PGUSER: lemmy
PGPASSWORD: password
PGHOST: database
PGDATABASE: lemmy
commands:
- cargo install diesel_cli
- export PATH="$CARGO_HOME/bin:$PATH"
# Run all migrations
- diesel migration run
# Dump schema to before.sqldump (PostgreSQL apt repo is used to prevent pg_dump version mismatch error)
- apt update && apt install -y lsb-release
- sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
- wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add -
- apt update && apt install -y postgresql-client-16
- psql -c "DROP SCHEMA IF EXISTS r CASCADE;"
- pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f before.sqldump
# Make sure that the newest migration is revertable without the `r` schema
- diesel migration redo
# Run schema setup twice, which fails on the 2nd time if `DROP SCHEMA IF EXISTS r CASCADE` drops the wrong things
- alias lemmy_schema_setup="target/lemmy_server --disable-scheduled-tasks --disable-http-server --disable-activity-sending"
- lemmy_schema_setup
- lemmy_schema_setup
# Make sure that the newest migration is revertable with the `r` schema
- diesel migration redo
# Check for changes in the schema, which would be caused by an incorrect migration
- psql -c "DROP SCHEMA IF EXISTS r CASCADE;"
- pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f after.sqldump
- diff before.sqldump after.sqldump
when: *slow_check_paths
run_federation_tests: run_federation_tests:
image: node:20-bookworm-slim image: node:20-bookworm-slim
environment: environment:

View file

@ -0,0 +1,476 @@
-- A trigger is associated with a table instead of a schema, so they can't be in the `r` schema. This is
-- okay if the function specified after `EXECUTE FUNCTION` is in `r`, since dropping the function drops the trigger.
--
-- Tables that are updated by triggers should not have foreign keys that aren't set to `INITIALLY DEFERRED`
-- (even if only other columns are updated) because triggers can run after the deletion of referenced rows and
-- before the automatic deletion of the row that references it. This is not a problem for insert or delete.
--
--
--
-- Create triggers for both post and comments
CREATE FUNCTION r.creator_id_from_post_aggregates (agg post_aggregates)
RETURNS int IMMUTABLE PARALLEL SAFE RETURN agg.creator_id;
CREATE FUNCTION r.creator_id_from_comment_aggregates (agg comment_aggregates)
RETURNS int IMMUTABLE PARALLEL SAFE RETURN (
SELECT
creator_id
FROM
comment
WHERE
comment.id = agg.comment_id LIMIT 1
);
CREATE PROCEDURE r.post_or_comment (table_name text)
LANGUAGE plpgsql
AS $a$
BEGIN
EXECUTE replace($b$
-- When a thing gets a vote, update its aggregates and its creator's aggregates
CALL r.create_triggers ('thing_like', $$
BEGIN
WITH thing_diff AS ( UPDATE
thing_aggregates AS a
SET
score = a.score + diff.upvotes - diff.downvotes, upvotes = a.upvotes + diff.upvotes, downvotes = a.downvotes + diff.downvotes, controversy_rank = r.controversy_rank ((a.upvotes + diff.upvotes)::numeric, (a.downvotes + diff.downvotes)::numeric)
FROM (
SELECT
(thing_like).thing_id, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score = 1), 0) AS upvotes, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score != 1), 0) AS downvotes FROM select_old_and_new_rows AS old_and_new_rows GROUP BY (thing_like).thing_id) AS diff
WHERE
a.thing_id = diff.thing_id
RETURNING
r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score)
UPDATE
person_aggregates AS a
SET
thing_score = a.thing_score + diff.score FROM (
SELECT
creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff
WHERE
a.person_id = diff.creator_id;
RETURN NULL;
END;
$$);
$b$,
'thing',
table_name);
END;
$a$;
CALL r.post_or_comment ('post');
CALL r.post_or_comment ('comment');
-- Create triggers that update counts in parent aggregates
CALL r.create_triggers ('comment', $$
BEGIN
UPDATE
person_aggregates AS a
SET
comment_count = a.comment_count + diff.comment_count
FROM (
SELECT
(comment).creator_id, coalesce(sum(count_diff), 0) AS comment_count
FROM select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (comment)
GROUP BY (comment).creator_id) AS diff
WHERE
a.person_id = diff.creator_id;
UPDATE
site_aggregates AS a
SET
comments = a.comments + diff.comments
FROM (
SELECT
coalesce(sum(count_diff), 0) AS comments
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (comment)
AND (comment).local) AS diff;
WITH post_diff AS (
UPDATE
post_aggregates AS a
SET
comments = a.comments + diff.comments,
newest_comment_time = GREATEST (a.newest_comment_time, (
SELECT
published
FROM select_new_rows AS new_comment
WHERE
a.post_id = new_comment.post_id ORDER BY published DESC LIMIT 1)),
newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, (
SELECT
published
FROM select_new_rows AS new_comment
WHERE
a.post_id = new_comment.post_id
-- Ignore comments from the post's creator
AND a.creator_id != new_comment.creator_id
-- Ignore comments on old posts
AND a.published > (new_comment.published - '2 days'::interval)
ORDER BY published DESC LIMIT 1))
FROM (
SELECT
(comment).post_id,
coalesce(sum(count_diff), 0) AS comments
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (comment)
GROUP BY
(comment).post_id) AS diff
LEFT JOIN post ON post.id = diff.post_id
WHERE
a.post_id = diff.post_id
RETURNING
a.community_id,
diff.comments,
r.is_counted (post.*) AS include_in_community_aggregates)
UPDATE
community_aggregates AS a
SET
comments = a.comments + diff.comments
FROM (
SELECT
community_id,
sum(comments) AS comments
FROM
post_diff
WHERE
post_diff.include_in_community_aggregates
GROUP BY
community_id) AS diff
WHERE
a.community_id = diff.community_id;
RETURN NULL;
END;
$$);
CALL r.create_triggers ('post', $$
BEGIN
UPDATE
person_aggregates AS a
SET
post_count = a.post_count + diff.post_count
FROM (
SELECT
(post).creator_id, coalesce(sum(count_diff), 0) AS post_count
FROM select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (post)
GROUP BY (post).creator_id) AS diff
WHERE
a.person_id = diff.creator_id;
UPDATE
site_aggregates AS a
SET
posts = a.posts + diff.posts
FROM (
SELECT
coalesce(sum(count_diff), 0) AS posts
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (post)
AND (post).local) AS diff;
UPDATE
community_aggregates AS a
SET
posts = a.posts + diff.posts
FROM (
SELECT
(post).community_id,
coalesce(sum(count_diff), 0) AS posts
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (post)
GROUP BY
(post).community_id) AS diff
WHERE
a.community_id = diff.community_id;
RETURN NULL;
END;
$$);
CALL r.create_triggers ('community', $$
BEGIN
UPDATE
site_aggregates AS a
SET
communities = a.communities + diff.communities
FROM (
SELECT
coalesce(sum(count_diff), 0) AS communities
FROM select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (community)
AND (community).local) AS diff;
RETURN NULL;
END;
$$);
CALL r.create_triggers ('person', $$
BEGIN
UPDATE
site_aggregates AS a
SET
users = a.users + diff.users
FROM (
SELECT
coalesce(sum(count_diff), 0) AS users
FROM select_old_and_new_rows AS old_and_new_rows
WHERE (person).local) AS diff;
RETURN NULL;
END;
$$);
-- For community_aggregates.comments, don't include comments of deleted or removed posts
CREATE FUNCTION r.update_comment_count_from_post ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE
community_aggregates AS a
SET
comments = a.comments + diff.comments
FROM (
SELECT
old_post.community_id,
sum((
CASE WHEN r.is_counted (new_post.*) THEN
1
ELSE
-1
END) * post_aggregates.comments) AS comments
FROM
new_post
INNER JOIN old_post ON new_post.id = old_post.id
AND (r.is_counted (new_post.*) != r.is_counted (old_post.*))
INNER JOIN post_aggregates ON post_aggregates.post_id = new_post.id
GROUP BY
old_post.community_id) AS diff
WHERE
a.community_id = diff.community_id;
RETURN NULL;
END;
$$;
CREATE TRIGGER comment_count
AFTER UPDATE ON post REFERENCING OLD TABLE AS old_post NEW TABLE AS new_post
FOR EACH STATEMENT
EXECUTE FUNCTION r.update_comment_count_from_post ();
-- Count subscribers for communities.
-- subscribers should be updated only when a local community is followed by a local or remote person.
-- subscribers_local should be updated only when a local person follows a local or remote community.
CALL r.create_triggers ('community_follower', $$
BEGIN
UPDATE
community_aggregates AS a
SET
subscribers = a.subscribers + diff.subscribers, subscribers_local = a.subscribers_local + diff.subscribers_local
FROM (
SELECT
(community_follower).community_id, coalesce(sum(count_diff) FILTER (WHERE community.local), 0) AS subscribers, coalesce(sum(count_diff) FILTER (WHERE person.local), 0) AS subscribers_local
FROM select_old_and_new_rows AS old_and_new_rows
LEFT JOIN community ON community.id = (community_follower).community_id
LEFT JOIN person ON person.id = (community_follower).person_id GROUP BY (community_follower).community_id) AS diff
WHERE
a.community_id = diff.community_id;
RETURN NULL;
END;
$$);
-- These triggers create and update rows in each aggregates table to match its associated table's rows.
-- Deleting rows and updating IDs are already handled by `CASCADE` in foreign key constraints.
CREATE FUNCTION r.comment_aggregates_from_comment ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO comment_aggregates (comment_id, published)
SELECT
id,
published
FROM
new_comment;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates
AFTER INSERT ON comment REFERENCING NEW TABLE AS new_comment
FOR EACH STATEMENT
EXECUTE FUNCTION r.comment_aggregates_from_comment ();
CREATE FUNCTION r.community_aggregates_from_community ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO community_aggregates (community_id, published)
SELECT
id,
published
FROM
new_community;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates
AFTER INSERT ON community REFERENCING NEW TABLE AS new_community
FOR EACH STATEMENT
EXECUTE FUNCTION r.community_aggregates_from_community ();
CREATE FUNCTION r.person_aggregates_from_person ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO person_aggregates (person_id)
SELECT
id
FROM
new_person;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates
AFTER INSERT ON person REFERENCING NEW TABLE AS new_person
FOR EACH STATEMENT
EXECUTE FUNCTION r.person_aggregates_from_person ();
CREATE FUNCTION r.post_aggregates_from_post ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id, featured_community, featured_local)
SELECT
new_post.id,
new_post.published,
new_post.published,
new_post.published,
new_post.community_id,
new_post.creator_id,
community.instance_id,
new_post.featured_community,
new_post.featured_local
FROM
new_post
INNER JOIN community ON community.id = new_post.community_id;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates
AFTER INSERT ON post REFERENCING NEW TABLE AS new_post
FOR EACH STATEMENT
EXECUTE FUNCTION r.post_aggregates_from_post ();
CREATE FUNCTION r.post_aggregates_from_post_update ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE
post_aggregates
SET
featured_community = new_post.featured_community,
featured_local = new_post.featured_local
FROM
new_post
INNER JOIN old_post ON old_post.id = new_post.id
AND (old_post.featured_community,
old_post.featured_local) != (new_post.featured_community,
old_post.featured_local)
WHERE
post_aggregates.post_id = new_post.id;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates_update
AFTER UPDATE ON post REFERENCING OLD TABLE AS old_post NEW TABLE AS new_post
FOR EACH STATEMENT
EXECUTE FUNCTION r.post_aggregates_from_post_update ();
CREATE FUNCTION r.site_aggregates_from_site ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
-- only 1 row can be in site_aggregates because of the index idx_site_aggregates_1_row_only.
-- we only ever want to have a single value in site_aggregate because the site_aggregate triggers update all rows in that table.
-- a cleaner check would be to insert it for the local_site but that would break assumptions at least in the tests
INSERT INTO site_aggregates (site_id)
VALUES (NEW.id)
ON CONFLICT ((TRUE))
DO NOTHING;
RETURN NULL;
END;
$$;
CREATE TRIGGER aggregates
AFTER INSERT ON site
FOR EACH ROW
EXECUTE FUNCTION r.site_aggregates_from_site ();
-- Change the order of some cascading deletions to make deletion triggers run before the deletion of rows that the triggers need to read
CREATE FUNCTION r.delete_comments_before_post ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM comment AS c
WHERE c.post_id = OLD.id;
RETURN OLD;
END;
$$;
CREATE TRIGGER delete_comments
BEFORE DELETE ON post
FOR EACH ROW
EXECUTE FUNCTION r.delete_comments_before_post ();
CREATE FUNCTION r.delete_follow_before_person ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM community_follower AS c
WHERE c.person_id = OLD.id;
RETURN OLD;
END;
$$;
CREATE TRIGGER delete_follow
BEFORE DELETE ON person
FOR EACH ROW
EXECUTE FUNCTION r.delete_follow_before_person ();

View file

@ -0,0 +1,146 @@
-- Each calculation used in triggers should be a single SQL language
-- expression so it can be inlined in migrations.
CREATE FUNCTION r.controversy_rank (upvotes numeric, downvotes numeric)
RETURNS float
LANGUAGE sql
IMMUTABLE PARALLEL SAFE RETURN CASE WHEN downvotes <= 0
OR upvotes <= 0 THEN
0
ELSE
(
upvotes + downvotes) * CASE WHEN upvotes > downvotes THEN
downvotes::float / upvotes::float
ELSE
upvotes::float / downvotes::float
END
END;
CREATE FUNCTION r.hot_rank (score numeric, published timestamp with time zone)
RETURNS double precision
LANGUAGE sql
IMMUTABLE PARALLEL SAFE RETURN
-- after a week, it will default to 0.
CASE WHEN (
now() - published) > '0 days'
AND (
now() - published) < '7 days' THEN
-- Use greatest(2,score), so that the hot_rank will be positive and not ignored.
log (
greatest (2, score + 2)) / power (((EXTRACT(EPOCH FROM (now() - published)) / 3600) + 2), 1.8)
ELSE
-- if the post is from the future, set hot score to 0. otherwise you can game the post to
-- always be on top even with only 1 vote by setting it to the future
0.0
END;
CREATE FUNCTION r.scaled_rank (score numeric, published timestamp with time zone, users_active_month numeric)
RETURNS double precision
LANGUAGE sql
IMMUTABLE PARALLEL SAFE
-- Add 2 to avoid divide by zero errors
-- Default for score = 1, active users = 1, and now, is (0.1728 / log(2 + 1)) = 0.3621
-- There may need to be a scale factor multiplied to users_active_month, to make
-- the log curve less pronounced. This can be tuned in the future.
RETURN (
r.hot_rank (score, published) / log(2 + users_active_month)
);
-- For tables with `deleted` and `removed` columns, this function determines which rows to include in a count.
CREATE FUNCTION r.is_counted (item record)
RETURNS bool
LANGUAGE plpgsql
IMMUTABLE PARALLEL SAFE
AS $$
BEGIN
RETURN COALESCE(NOT (item.deleted
OR item.removed), FALSE);
END;
$$;
-- This function creates statement-level triggers for all operation types. It's designed this way
-- because of these limitations:
-- * A trigger that uses transition tables can only handle 1 operation type.
-- * Transition tables must be relevant for the operation type (for example, `NEW TABLE` is
-- not allowed for a `DELETE` trigger)
-- * Transition tables are only provided to the trigger function, not to functions that it calls.
--
-- This function can only be called once per table. The trigger function body given as the 2nd argument
-- and can contain these names, which are replaced with a `SELECT` statement in parenthesis if needed:
-- * `select_old_rows`
-- * `select_new_rows`
-- * `select_old_and_new_rows` with 2 columns:
-- 1. `count_diff`: `-1` for old rows and `1` for new rows, which can be used with `sum` to get the number
-- to add to a count
-- 2. (same name as the trigger's table): the old or new row as a composite value
CREATE PROCEDURE r.create_triggers (table_name text, function_body text)
LANGUAGE plpgsql
AS $a$
DECLARE
defs text := $$
-- Delete
CREATE FUNCTION r.thing_delete_statement ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS function_body_delete;
CREATE TRIGGER delete_statement
AFTER DELETE ON thing REFERENCING OLD TABLE AS select_old_rows
FOR EACH STATEMENT
EXECUTE FUNCTION r.thing_delete_statement ( );
-- Insert
CREATE FUNCTION r.thing_insert_statement ( )
RETURNS TRIGGER
LANGUAGE plpgsql
AS function_body_insert;
CREATE TRIGGER insert_statement
AFTER INSERT ON thing REFERENCING NEW TABLE AS select_new_rows
FOR EACH STATEMENT
EXECUTE FUNCTION r.thing_insert_statement ( );
-- Update
CREATE FUNCTION r.thing_update_statement ( )
RETURNS TRIGGER
LANGUAGE plpgsql
AS function_body_update;
CREATE TRIGGER update_statement
AFTER UPDATE ON thing REFERENCING OLD TABLE AS select_old_rows NEW TABLE AS select_new_rows
FOR EACH STATEMENT
EXECUTE FUNCTION r.thing_update_statement ( );
$$;
select_old_and_new_rows text := $$ (
SELECT
-1 AS count_diff,
old_table::thing AS thing
FROM
select_old_rows AS old_table
UNION ALL
SELECT
1 AS count_diff,
new_table::thing AS thing
FROM
select_new_rows AS new_table) $$;
empty_select_new_rows text := $$ (
SELECT
*
FROM
-- Real transition table
select_old_rows
WHERE
FALSE) $$;
empty_select_old_rows text := $$ (
SELECT
*
FROM
-- Real transition table
select_new_rows
WHERE
FALSE) $$;
BEGIN
function_body := replace(function_body, 'select_old_and_new_rows', select_old_and_new_rows);
-- `select_old_rows` and `select_new_rows` are made available as empty tables if they don't already exist
defs := replace(defs, 'function_body_delete', quote_literal(replace(function_body, 'select_new_rows', empty_select_new_rows)));
defs := replace(defs, 'function_body_insert', quote_literal(replace(function_body, 'select_old_rows', empty_select_old_rows)));
defs := replace(defs, 'function_body_update', quote_literal(function_body));
defs := replace(defs, 'thing', table_name);
EXECUTE defs;
END;
$a$;

View file

@ -85,12 +85,9 @@ mod tests {
.unwrap() .unwrap()
.into(); .into();
// inserting activity for first time // inserting activity should only work once
let res = ReceivedActivity::create(pool, &ap_id).await; ReceivedActivity::create(pool, &ap_id).await.unwrap();
assert!(res.is_ok()); ReceivedActivity::create(pool, &ap_id).await.unwrap_err();
let res = ReceivedActivity::create(pool, &ap_id).await;
assert!(res.is_err());
} }
#[tokio::test] #[tokio::test]

View file

@ -43,6 +43,9 @@ pub mod traits;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod utils; pub mod utils;
#[cfg(feature = "full")]
mod schema_setup;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString}; use strum_macros::{Display, EnumString};
#[cfg(feature = "full")] #[cfg(feature = "full")]

View file

@ -0,0 +1,64 @@
use anyhow::Context;
use diesel::{connection::SimpleConnection, Connection, PgConnection};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use lemmy_utils::error::LemmyError;
use tracing::info;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema
/// (indicated by `r.` before the name), unless a comment says otherwise.
///
/// Currently, this code is only run after the server starts and there's at least 1 pending migration
/// to run. This means every time you change something here, you must also create a migration (a blank
/// up.sql file works fine). This behavior will be removed when we implement a better way to avoid
/// useless schema updates and locks.
///
/// If you add something that depends on something (such as a table) created in a new migration, then down.sql
/// must use `CASCADE` when dropping it. This doesn't need to be fixed in old migrations because the
/// "replaceable-schema" migration runs `DROP SCHEMA IF EXISTS r CASCADE` in down.sql.
const REPLACEABLE_SCHEMA: &[&str] = &[
"DROP SCHEMA IF EXISTS r CASCADE;",
"CREATE SCHEMA r;",
include_str!("../replaceable_schema/utils.sql"),
include_str!("../replaceable_schema/triggers.sql"),
];
pub fn run(db_url: &str) -> Result<(), LemmyError> {
// Migrations don't support async connection
let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?;
// Run all pending migrations except for the newest one, then run the newest one in the same transaction
// as `REPLACEABLE_SCHEMA`. This code will be becone less hacky when the conditional setup of things in
// `REPLACEABLE_SCHEMA` is done without using the number of pending migrations.
info!("Running Database migrations (This may take a long time)...");
let migrations = conn
.pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?;
for migration in migrations.iter().rev().skip(1).rev() {
conn
.run_migration(migration)
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?;
}
conn.transaction::<_, LemmyError, _>(|conn| {
if let Some(migration) = migrations.last() {
// Migration is run with a savepoint since there's already a transaction
conn
.run_migration(migration)
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?;
} else if !cfg!(debug_assertions) {
// In production, skip running `REPLACEABLE_SCHEMA` to avoid locking things in the schema. In
// CI, always run it because `diesel migration` commands would otherwise prevent it.
return Ok(());
}
conn
.batch_execute(&REPLACEABLE_SCHEMA.join("\n"))
.context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
Ok(())
})?;
info!("Database migrations complete.");
Ok(())
}

View file

@ -1,11 +1,4 @@
use crate::{ use crate::{newtypes::DbUrl, CommentSortType, SortType};
diesel::Connection,
diesel_migrations::MigrationHarness,
newtypes::DbUrl,
CommentSortType,
SortType,
};
use anyhow::Context;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use deadpool::Runtime; use deadpool::Runtime;
use diesel::{ use diesel::{
@ -21,7 +14,6 @@ use diesel::{
sql_types::{self, Timestamptz}, sql_types::{self, Timestamptz},
IntoSql, IntoSql,
OptionalExtension, OptionalExtension,
PgConnection,
}; };
use diesel_async::{ use diesel_async::{
pg::AsyncPgConnection, pg::AsyncPgConnection,
@ -32,7 +24,6 @@ use diesel_async::{
}, },
SimpleAsyncConnection, SimpleAsyncConnection,
}; };
use diesel_migrations::EmbeddedMigrations;
use futures_util::{future::BoxFuture, Future, FutureExt}; use futures_util::{future::BoxFuture, Future, FutureExt};
use i_love_jesus::CursorKey; use i_love_jesus::CursorKey;
use lemmy_utils::{ use lemmy_utils::{
@ -50,7 +41,7 @@ use std::{
sync::Arc, sync::Arc,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use tracing::{error, info}; use tracing::error;
use url::Url; use url::Url;
const FETCH_LIMIT_DEFAULT: i64 = 10; const FETCH_LIMIT_DEFAULT: i64 = 10;
@ -364,21 +355,6 @@ impl ServerCertVerifier for NoCertVerifier {
} }
} }
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
fn run_migrations(db_url: &str) -> LemmyResult<()> {
// Needs to be a sync connection
let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?;
info!("Running Database migrations (This may take a long time)...");
conn
.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?;
info!("Database migrations complete.");
Ok(())
}
pub async fn build_db_pool() -> LemmyResult<ActualDbPool> { pub async fn build_db_pool() -> LemmyResult<ActualDbPool> {
let db_url = SETTINGS.get_database_url(); let db_url = SETTINGS.get_database_url();
// We only support TLS with sslmode=require currently // We only support TLS with sslmode=require currently
@ -407,7 +383,7 @@ pub async fn build_db_pool() -> LemmyResult<ActualDbPool> {
})) }))
.build()?; .build()?;
run_migrations(&db_url)?; crate::schema_setup::run(&db_url)?;
Ok(pool) Ok(pool)
} }
@ -449,14 +425,17 @@ pub mod functions {
use diesel::sql_types::{BigInt, Text, Timestamptz}; use diesel::sql_types::{BigInt, Text, Timestamptz};
sql_function! { sql_function! {
#[sql_name = "r.hot_rank"]
fn hot_rank(score: BigInt, time: Timestamptz) -> Double; fn hot_rank(score: BigInt, time: Timestamptz) -> Double;
} }
sql_function! { sql_function! {
#[sql_name = "r.scaled_rank"]
fn scaled_rank(score: BigInt, time: Timestamptz, users_active_month: BigInt) -> Double; fn scaled_rank(score: BigInt, time: Timestamptz, users_active_month: BigInt) -> Double;
} }
sql_function! { sql_function! {
#[sql_name = "r.controversy_rank"]
fn controversy_rank(upvotes: BigInt, downvotes: BigInt, score: BigInt) -> Double; fn controversy_rank(upvotes: BigInt, downvotes: BigInt, score: BigInt) -> Double;
} }

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,81 @@
CREATE UNIQUE INDEX idx_site_aggregates_1_row_only ON site_aggregates ((TRUE));
-- Drop functions and use `CASCADE` to drop the triggers that use them
DROP FUNCTION comment_aggregates_comment, comment_aggregates_score, community_aggregates_comment_count, community_aggregates_community, community_aggregates_post_count, community_aggregates_post_count_insert, community_aggregates_subscriber_count, delete_follow_before_person, person_aggregates_comment_count, person_aggregates_comment_score, person_aggregates_person, person_aggregates_post_count, person_aggregates_post_insert, person_aggregates_post_score, post_aggregates_comment_count, post_aggregates_featured_community, post_aggregates_featured_local, post_aggregates_post, post_aggregates_score, site_aggregates_comment_delete, site_aggregates_comment_insert, site_aggregates_community_delete, site_aggregates_community_insert, site_aggregates_person_delete, site_aggregates_person_insert, site_aggregates_post_delete, site_aggregates_post_insert, site_aggregates_post_update, site_aggregates_site, was_removed_or_deleted, was_restored_or_created CASCADE;
-- Drop rank functions
DROP FUNCTION controversy_rank, scaled_rank, hot_rank;
-- Defer constraints
ALTER TABLE comment_aggregates
ALTER CONSTRAINT comment_aggregates_comment_id_fkey INITIALLY DEFERRED;
ALTER TABLE community_aggregates
ALTER CONSTRAINT community_aggregates_community_id_fkey INITIALLY DEFERRED;
ALTER TABLE person_aggregates
ALTER CONSTRAINT person_aggregates_person_id_fkey INITIALLY DEFERRED;
ALTER TABLE post_aggregates
ALTER CONSTRAINT post_aggregates_community_id_fkey INITIALLY DEFERRED,
ALTER CONSTRAINT post_aggregates_creator_id_fkey INITIALLY DEFERRED,
ALTER CONSTRAINT post_aggregates_instance_id_fkey INITIALLY DEFERRED,
ALTER CONSTRAINT post_aggregates_post_id_fkey INITIALLY DEFERRED;
ALTER TABLE site_aggregates
ALTER CONSTRAINT site_aggregates_site_id_fkey INITIALLY DEFERRED;
-- Fix values that might be incorrect because of the old triggers
UPDATE
post_aggregates
SET
featured_local = post.featured_local,
featured_community = post.featured_community
FROM
post
WHERE
post_aggregates.post_id = post.id
AND (post_aggregates.featured_local,
post_aggregates.featured_community) != (post.featured_local,
post.featured_community);
UPDATE
community_aggregates
SET
comments = counted.comments
FROM (
SELECT
community_id,
count(*) AS comments
FROM
comment,
LATERAL (
SELECT
*
FROM
post
WHERE
post.id = comment.post_id
LIMIT 1) AS post
WHERE
NOT (comment.deleted
OR comment.removed
OR post.deleted
OR post.removed)
GROUP BY
community_id) AS counted
WHERE
community_aggregates.community_id = counted.community_id
AND community_aggregates.comments != counted.comments;
UPDATE
site_aggregates
SET
communities = (
SELECT
count(*)
FROM
community
WHERE
local);

16
scripts/dump_schema.sh Executable file
View file

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -e
# Dumps database schema, not including things that are added outside of migrations
CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)"
cd $CWD/../
source scripts/start_dev_db.sh
diesel migration run
pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f schema.sqldump
pg_ctl stop
rm -rf $PGDATA

View file

@ -9,10 +9,12 @@ cd $CWD/../
# Copy the files to a temp dir # Copy the files to a temp dir
TMP_DIR=$(mktemp -d) TMP_DIR=$(mktemp -d)
cp -a migrations/. $TMP_DIR cp -a migrations/. $TMP_DIR/migrations
cp -a crates/db_schema/replaceable_schema/. $TMP_DIR/replaceable_schema
# Format the new files # Format the new files
find $TMP_DIR -type f -name '*.sql' -exec pg_format -i {} + find $TMP_DIR -type f -name '*.sql' -exec pg_format -i {} +
# Diff the directories # Diff the directories
diff -r migrations $TMP_DIR diff -r migrations $TMP_DIR/migrations
diff -r crates/db_schema/replaceable_schema $TMP_DIR/replaceable_schema

View file

@ -2,8 +2,10 @@
export PGDATA="$PWD/dev_pgdata" export PGDATA="$PWD/dev_pgdata"
export PGHOST=$PWD export PGHOST=$PWD
export PGUSER=postgres
export DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD" export DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD"
export LEMMY_DATABASE_URL=$DATABASE_URL export LEMMY_DATABASE_URL=$DATABASE_URL
export PGDATABASE=lemmy
# If cluster exists, stop the server and delete the cluster # If cluster exists, stop the server and delete the cluster
if [[ -d $PGDATA ]] if [[ -d $PGDATA ]]
@ -44,5 +46,5 @@ pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructio
pg_ctl start --silent --options="${config_args[*]}" pg_ctl start --silent --options="${config_args[*]}"
# Setup database # Setup database
psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres PGDATABASE=postgres psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;"
psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres PGDATABASE=postgres psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;"

View file

@ -218,6 +218,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
if server.is_some() || federate.is_some() {
tokio::select! { tokio::select! {
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
tracing::warn!("Received ctrl-c, shutting down gracefully..."); tracing::warn!("Received ctrl-c, shutting down gracefully...");
@ -229,6 +230,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
tracing::warn!("Received terminate, shutting down gracefully..."); tracing::warn!("Received terminate, shutting down gracefully...");
} }
} }
}
if let Some(server) = server { if let Some(server) = server {
server.stop(true).await; server.stop(true).await;
} }

View file

@ -131,7 +131,7 @@ async fn update_hot_ranks(pool: &mut DbPool<'_>) {
&mut conn, &mut conn,
"comment", "comment",
"a.hot_rank != 0", "a.hot_rank != 0",
"SET hot_rank = hot_rank(a.score, a.published)", "SET hot_rank = r.hot_rank(a.score, a.published)",
) )
.await; .await;
@ -139,7 +139,7 @@ async fn update_hot_ranks(pool: &mut DbPool<'_>) {
&mut conn, &mut conn,
"community", "community",
"a.hot_rank != 0", "a.hot_rank != 0",
"SET hot_rank = hot_rank(a.subscribers, a.published)", "SET hot_rank = r.hot_rank(a.subscribers, a.published)",
) )
.await; .await;
@ -236,9 +236,9 @@ async fn process_post_aggregates_ranks_in_batches(conn: &mut AsyncPgConnection)
LIMIT $2 LIMIT $2
FOR UPDATE SKIP LOCKED) FOR UPDATE SKIP LOCKED)
UPDATE post_aggregates pa UPDATE post_aggregates pa
SET hot_rank = hot_rank(pa.score, pa.published), SET hot_rank = r.hot_rank(pa.score, pa.published),
hot_rank_active = hot_rank(pa.score, pa.newest_comment_time_necro), hot_rank_active = r.hot_rank(pa.score, pa.newest_comment_time_necro),
scaled_rank = scaled_rank(pa.score, pa.published, ca.users_active_month) scaled_rank = r.scaled_rank(pa.score, pa.published, ca.users_active_month)
FROM batch, community_aggregates ca FROM batch, community_aggregates ca
WHERE pa.post_id = batch.post_id and pa.community_id = ca.community_id RETURNING pa.published; WHERE pa.post_id = batch.post_id and pa.community_id = ca.community_id RETURNING pa.published;
"#, "#,