mirror of
https://github.com/LemmyNet/lemmy.git
synced 2024-11-27 14:51:18 +00:00
stuff
This commit is contained in:
parent
6eb8d046fa
commit
cf5afc9fb3
2 changed files with 367 additions and 231 deletions
|
@ -5,10 +5,14 @@ use crate::schema::previously_run_sql;
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
use diesel::{
|
use diesel::{
|
||||||
connection::SimpleConnection,
|
connection::SimpleConnection,
|
||||||
|
dsl::exists,
|
||||||
|
expression::IntoSql,
|
||||||
migration::{Migration, MigrationSource, MigrationVersion},
|
migration::{Migration, MigrationSource, MigrationVersion},
|
||||||
pg::Pg,
|
pg::Pg,
|
||||||
select,
|
select,
|
||||||
|
sql_types,
|
||||||
update,
|
update,
|
||||||
|
BoolExpressionMethods,
|
||||||
Connection,
|
Connection,
|
||||||
ExpressionMethods,
|
ExpressionMethods,
|
||||||
NullableExpressionMethods,
|
NullableExpressionMethods,
|
||||||
|
@ -21,9 +25,15 @@ use lemmy_utils::{error::LemmyResult, settings::SETTINGS};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
diesel::table! {
|
||||||
|
pg_namespace (nspname) {
|
||||||
|
nspname -> Text,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// In production, include migrations in the binary
|
// In production, include migrations in the binary
|
||||||
#[cfg(not(debug_assertions))]
|
#[cfg(not(debug_assertions))]
|
||||||
fn get_migration_source() -> diesel_migrations::EmbeddedMigrations {
|
fn migrations() -> diesel_migrations::EmbeddedMigrations {
|
||||||
// Using `const` here is required by the borrow checker
|
// Using `const` here is required by the borrow checker
|
||||||
const MIGRATIONS: diesel_migrations::EmbeddedMigrations = diesel_migrations::embed_migrations!();
|
const MIGRATIONS: diesel_migrations::EmbeddedMigrations = diesel_migrations::embed_migrations!();
|
||||||
MIGRATIONS
|
MIGRATIONS
|
||||||
|
@ -31,52 +41,75 @@ fn get_migration_source() -> diesel_migrations::EmbeddedMigrations {
|
||||||
|
|
||||||
// Avoid recompiling when migrations are changed
|
// Avoid recompiling when migrations are changed
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
fn get_migration_source() -> diesel_migrations::FileBasedMigrations {
|
fn migrations() -> diesel_migrations::FileBasedMigrations {
|
||||||
diesel_migrations::FileBasedMigrations::find_migrations_directory()
|
diesel_migrations::FileBasedMigrations::find_migrations_directory()
|
||||||
.expect("failed to find migrations dir")
|
.expect("failed to get migration source")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced
|
/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced
|
||||||
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema
|
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema
|
||||||
/// (indicated by `r.` before the name), unless a comment says otherwise.
|
/// (indicated by `r.` before the name), unless a comment says otherwise.
|
||||||
const REPLACEABLE_SCHEMA: &[&str] = &[
|
fn replaceable_schema() -> String {
|
||||||
|
[
|
||||||
"CREATE SCHEMA r;",
|
"CREATE SCHEMA r;",
|
||||||
include_str!("../replaceable_schema/utils.sql"),
|
include_str!("../replaceable_schema/utils.sql"),
|
||||||
include_str!("../replaceable_schema/triggers.sql"),
|
include_str!("../replaceable_schema/triggers.sql"),
|
||||||
];
|
]
|
||||||
|
.join("\n")
|
||||||
struct MigrationHarnessWrapper<'a, 'b> {
|
|
||||||
conn: &'a mut PgConnection,
|
|
||||||
options: &'b Options,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
const REPLACEABLE_SCHEMA_PATH: &str = "crates/db_schema/replaceable_schema";
|
||||||
|
|
||||||
|
struct MigrationHarnessWrapper<'a, 'b, 'c> {
|
||||||
|
conn: &'a mut PgConnection,
|
||||||
|
lock_conn: &'b mut PgConnection,
|
||||||
|
options: &'c Options,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, 'c> MigrationHarnessWrapper<'a, 'b, 'c> {
|
||||||
|
fn run_migration_inner(
|
||||||
|
&mut self,
|
||||||
|
migration: &dyn Migration<Pg>,
|
||||||
|
) -> diesel::migration::Result<MigrationVersion<'static>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
|
let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| {
|
||||||
|
conn.run_migration(migration)
|
||||||
|
});
|
||||||
|
|
||||||
|
let duration = start_time.elapsed().as_millis();
|
||||||
|
let name = migration.name();
|
||||||
|
info!("{duration}ms run {name}");
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, 'c> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b, 'c> {
|
||||||
fn run_migration(
|
fn run_migration(
|
||||||
&mut self,
|
&mut self,
|
||||||
migration: &dyn Migration<Pg>,
|
migration: &dyn Migration<Pg>,
|
||||||
) -> diesel::migration::Result<MigrationVersion<'static>> {
|
) -> diesel::migration::Result<MigrationVersion<'static>> {
|
||||||
let name = migration.name();
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
if self.options.enable_diff_check {
|
if self.options.enable_diff_check {
|
||||||
let before = diff_check::get_dump(&mut self.conn);
|
let before = diff_check::get_dump();
|
||||||
self.conn.run_migration(migration)?;
|
|
||||||
self.conn.revert_migration(migration)?;
|
self.run_migration_inner(migration)?;
|
||||||
|
self.revert_migration(migration)?;
|
||||||
|
|
||||||
|
let after = diff_check::get_dump();
|
||||||
|
|
||||||
diff_check::check_dump_diff(
|
diff_check::check_dump_diff(
|
||||||
&mut self.conn,
|
after,
|
||||||
before,
|
before,
|
||||||
&format!("migrations/{name}/down.sql"),
|
&format!(
|
||||||
|
"These changes need to be applied in migrations/{}/down.sql:",
|
||||||
|
migration.name()
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let start_time = Instant::now();
|
self.run_migration_inner(migration)
|
||||||
|
|
||||||
let result = self.conn.run_migration(migration);
|
|
||||||
|
|
||||||
let duration = start_time.elapsed().as_millis();
|
|
||||||
info!("{duration}ms run {name}");
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn revert_migration(
|
fn revert_migration(
|
||||||
|
@ -84,12 +117,14 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
||||||
migration: &dyn Migration<Pg>,
|
migration: &dyn Migration<Pg>,
|
||||||
) -> diesel::migration::Result<MigrationVersion<'static>> {
|
) -> diesel::migration::Result<MigrationVersion<'static>> {
|
||||||
if self.options.enable_diff_check {
|
if self.options.enable_diff_check {
|
||||||
unimplemented!("diff check when reverting migrations");
|
//unimplemented!("diff check when reverting migrations");
|
||||||
}
|
}
|
||||||
|
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let result = self.conn.revert_migration(migration);
|
let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| {
|
||||||
|
conn.revert_migration(migration)
|
||||||
|
});
|
||||||
|
|
||||||
let duration = start_time.elapsed().as_millis();
|
let duration = start_time.elapsed().as_millis();
|
||||||
let name = migration.name();
|
let name = migration.name();
|
||||||
|
@ -103,26 +138,24 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove when diesel either adds MigrationSource impl for references or changes functions to take reference
|
|
||||||
#[derive(Clone, Copy)]
|
|
||||||
struct MigrationSourceRef<T>(
|
|
||||||
// If this was `&T`, then the derive macros would add `Clone` and `Copy` bounds for `T`
|
|
||||||
T,
|
|
||||||
);
|
|
||||||
|
|
||||||
impl<'a, T: MigrationSource<Pg>> MigrationSource<Pg> for MigrationSourceRef<&'a T> {
|
|
||||||
fn migrations(&self) -> diesel::migration::Result<Vec<Box<dyn Migration<Pg>>>> {
|
|
||||||
self.0.migrations()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Options {
|
pub struct Options {
|
||||||
enable_forbid_diesel_cli_trigger: bool,
|
enable_forbid_diesel_cli_trigger: bool,
|
||||||
enable_diff_check: bool,
|
enable_diff_check: bool,
|
||||||
revert: bool,
|
revert: bool,
|
||||||
revert_amount: Option<u64>,
|
run: bool,
|
||||||
redo_after_revert: bool,
|
amount: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Options {
|
||||||
|
fn default() -> Self {
|
||||||
|
Options {
|
||||||
|
enable_forbid_diesel_cli_trigger: false,
|
||||||
|
enable_diff_check: false,
|
||||||
|
revert: false,
|
||||||
|
run: true,
|
||||||
|
amount: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Options {
|
impl Options {
|
||||||
|
@ -140,13 +173,16 @@ impl Options {
|
||||||
|
|
||||||
pub fn revert(mut self, amount: Option<u64>) -> Self {
|
pub fn revert(mut self, amount: Option<u64>) -> Self {
|
||||||
self.revert = true;
|
self.revert = true;
|
||||||
self.revert_amount = amount;
|
self.run = false;
|
||||||
|
self.amount = amount;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn redo(mut self, amount: Option<u64>) -> Self {
|
pub fn redo(mut self, amount: Option<u64>) -> Self {
|
||||||
self.redo_after_revert = true;
|
self.revert = true;
|
||||||
self.revert(amount)
|
self.run = true;
|
||||||
|
self.amount = amount;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,32 +191,27 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
let db_url = SETTINGS.get_database_url();
|
let db_url = SETTINGS.get_database_url();
|
||||||
|
|
||||||
// Migrations don't support async connection, and this function doesn't need to be async
|
// Migrations don't support async connection, and this function doesn't need to be async
|
||||||
let mut conn =
|
//let mut conn = PgConnection::establish(&db_url).context("Error connecting to database")?;
|
||||||
PgConnection::establish(&db_url).with_context(|| "Error connecting to database")?;
|
let mut conn = PgConnection::establish(&db_url)?;
|
||||||
|
|
||||||
let new_sql = REPLACEABLE_SCHEMA.join("\n");
|
|
||||||
|
|
||||||
let migration_source = get_migration_source();
|
|
||||||
let migration_source_ref = MigrationSourceRef(&migration_source);
|
|
||||||
|
|
||||||
// If possible, skip locking the migrations table and recreating the "r" schema, so
|
// If possible, skip locking the migrations table and recreating the "r" schema, so
|
||||||
// lemmy_server processes in a horizontally scaled setup can start without causing locks
|
// lemmy_server processes in a horizontally scaled setup can start without causing locks
|
||||||
if !(options.revert
|
if !options.revert
|
||||||
|| conn
|
&& options.run
|
||||||
.has_pending_migration(migration_source_ref)
|
&& options.amount.is_none()
|
||||||
.map_err(|e| anyhow!("Couldn't check pending migrations: {e}"))?)
|
&& !conn
|
||||||
|
.has_pending_migration(migrations())
|
||||||
|
.map_err(convert_err)?
|
||||||
|
//.map_err(|e| anyhow!("Couldn't check pending migrations: {e}"))?)
|
||||||
{
|
{
|
||||||
// The condition above implies that the migration that creates the previously_run_sql table was already run
|
// The condition above implies that the migration that creates the previously_run_sql table was already run
|
||||||
let sql_unchanged: bool = select(
|
let sql_unchanged = exists(
|
||||||
previously_run_sql::table
|
previously_run_sql::table.filter(previously_run_sql::content.eq(replaceable_schema())),
|
||||||
.select(previously_run_sql::content)
|
);
|
||||||
.single_value()
|
|
||||||
.assume_not_null()
|
|
||||||
.eq(&new_sql),
|
|
||||||
)
|
|
||||||
.get_result(&mut conn)?;
|
|
||||||
|
|
||||||
if sql_unchanged {
|
let schema_exists = exists(pg_namespace::table.find("r"));
|
||||||
|
|
||||||
|
if select(sql_unchanged.and(schema_exists)).get_result(&mut conn)? {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -190,103 +221,191 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
conn.batch_execute("SET lemmy.enable_migrations TO 'on';")?;
|
conn.batch_execute("SET lemmy.enable_migrations TO 'on';")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Running without transaction allows pg_dump to see results of migrations
|
// Repurpose the table created by `has_pending_migration` for locking, which
|
||||||
// TODO never use 1 transaction
|
// blocks concurrent attempts to run migrations, but not normal use of the table
|
||||||
let run_in_transaction = !options.enable_diff_check;
|
|
||||||
|
|
||||||
let transaction = |conn: &mut PgConnection| -> LemmyResult<()> {
|
// Using the same connection for both the lock and the migrations would require
|
||||||
let mut wrapper = MigrationHarnessWrapper {
|
// running all migrations in the same transaction, which would prevent:
|
||||||
conn,
|
// * Diff checker using pg_dump to see the effects of each migration
|
||||||
options: &options,
|
// * Migrations using an enum value added in a previous migration in the same transaction
|
||||||
};
|
// * Inspection of the database schema after only some migrations succeed
|
||||||
|
PgConnection::establish(&db_url)?.transaction(|lock_conn| -> LemmyResult<()> {
|
||||||
// * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing
|
|
||||||
// the table created by `MigrationHarness::pending_migrations` as a lock target (this doesn't block
|
|
||||||
// normal use of the table)
|
|
||||||
// * Drop `r` schema, so migrations don't need to be made to work both with and without things in
|
|
||||||
// it existing
|
|
||||||
info!("Waiting for lock...");
|
info!("Waiting for lock...");
|
||||||
|
lock_conn.batch_execute("LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;")?;
|
||||||
let lock = if run_in_transaction {
|
|
||||||
"LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
};
|
|
||||||
|
|
||||||
wrapper
|
|
||||||
.conn
|
|
||||||
.batch_execute(&format!("{lock}DROP SCHEMA IF EXISTS r CASCADE;"))?;
|
|
||||||
|
|
||||||
info!("Running Database migrations (This may take a long time)...");
|
info!("Running Database migrations (This may take a long time)...");
|
||||||
|
|
||||||
(|| {
|
// Drop `r` schema, so migrations don't need to be made to work both with and without things in
|
||||||
if options.revert {
|
// it existing
|
||||||
if let Some(amount) = options.revert_amount {
|
revert_replaceable_schema(&mut conn, lock_conn)?;
|
||||||
for _ in 0..amount {
|
|
||||||
wrapper.revert_last_migration(migration_source_ref)?;
|
|
||||||
}
|
|
||||||
if options.redo_after_revert {
|
|
||||||
for _ in 0..amount {
|
|
||||||
wrapper.run_next_migration(migration_source_ref)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
wrapper.revert_all_migrations(migration_source_ref)?;
|
|
||||||
if options.redo_after_revert {
|
|
||||||
wrapper.run_pending_migrations(migration_source_ref)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
wrapper.run_pending_migrations(migration_source_ref)?;
|
|
||||||
}
|
|
||||||
diesel::migration::Result::Ok(())
|
|
||||||
})()
|
|
||||||
.map_err(|e| anyhow!("Couldn't run DB Migrations: {e}"))?;
|
|
||||||
|
|
||||||
// Run replaceable_schema if newest migration was applied
|
run_selected_migrations(&mut conn, lock_conn, &options).map_err(convert_err)?;
|
||||||
if !(options.revert && !options.redo_after_revert) {
|
|
||||||
|
// Only run replaceable_schema if newest migration was applied
|
||||||
|
if (options.run && options.amount.is_none()) || !conn.has_pending_migration(migrations()).map_err(convert_err)? {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
if options.enable_diff_check {
|
if options.enable_diff_check {
|
||||||
let before = diff_check::get_dump(&mut wrapper.conn);
|
let before = diff_check::get_dump();
|
||||||
// todo move replaceable_schema dir path to let/const?
|
|
||||||
wrapper
|
run_replaceable_schema(&mut conn, lock_conn)?;
|
||||||
.conn
|
revert_replaceable_schema(&mut conn, lock_conn)?;
|
||||||
.batch_execute(&new_sql)
|
|
||||||
.context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
|
let after = diff_check::get_dump();
|
||||||
// todo move statement to const
|
|
||||||
wrapper
|
diff_check::check_dump_diff(before, after, "The code in crates/db_schema/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:");
|
||||||
.conn
|
|
||||||
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?;
|
|
||||||
// todo use different first output line in this case
|
|
||||||
diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wrapper
|
run_replaceable_schema(&mut conn, lock_conn)?;
|
||||||
.conn
|
|
||||||
.batch_execute(&new_sql)
|
|
||||||
.context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
|
|
||||||
|
|
||||||
let num_rows_updated = update(previously_run_sql::table)
|
|
||||||
.set(previously_run_sql::content.eq(new_sql))
|
|
||||||
.execute(wrapper.conn)?;
|
|
||||||
|
|
||||||
debug_assert_eq!(num_rows_updated, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
})?;
|
||||||
|
|
||||||
if run_in_transaction {
|
|
||||||
conn.transaction(transaction)?;
|
|
||||||
} else {
|
|
||||||
transaction(&mut conn)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Database migrations complete.");
|
info!("Database migrations complete.");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn run_replaceable_schema(
|
||||||
|
conn: &mut PgConnection,
|
||||||
|
lock_conn: &mut PgConnection,
|
||||||
|
) -> LemmyResult<()> {
|
||||||
|
rollback_if_lock_conn_broke(conn, lock_conn, |conn| {
|
||||||
|
conn
|
||||||
|
.batch_execute(&replaceable_schema())
|
||||||
|
.with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?;
|
||||||
|
|
||||||
|
let num_rows_updated = update(previously_run_sql::table)
|
||||||
|
.set(previously_run_sql::content.eq(replaceable_schema()))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
debug_assert_eq!(num_rows_updated, 1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn revert_replaceable_schema(
|
||||||
|
conn: &mut PgConnection,
|
||||||
|
lock_conn: &mut PgConnection,
|
||||||
|
) -> LemmyResult<()> {
|
||||||
|
rollback_if_lock_conn_broke(conn, lock_conn, |conn| {
|
||||||
|
conn
|
||||||
|
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")
|
||||||
|
.with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?;
|
||||||
|
|
||||||
|
// Value in `previously_run_sql` table is not set here because the table might not exist
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_selected_migrations(
|
||||||
|
conn: &mut PgConnection,
|
||||||
|
lock_conn: &mut PgConnection,
|
||||||
|
options: &Options,
|
||||||
|
) -> diesel::migration::Result<()> {
|
||||||
|
let mut wrapper = MigrationHarnessWrapper {
|
||||||
|
conn,
|
||||||
|
lock_conn,
|
||||||
|
options,
|
||||||
|
};
|
||||||
|
|
||||||
|
/*let revert = (
|
||||||
|
options.revert,
|
||||||
|
MigrationHarnessWrapper::revert_last_migration as fn(_, _) -> _,
|
||||||
|
MigrationHarnessWrapper::revert_all_migrations as fn(_, _) -> _,
|
||||||
|
);
|
||||||
|
let run = (
|
||||||
|
options.run,
|
||||||
|
MigrationHarnessWrapper::run_next_migration,
|
||||||
|
MigrationHarnessWrapper::run_pending_migrations,
|
||||||
|
);
|
||||||
|
|
||||||
|
for (condition, run_one, run_all) in [revert, run] {
|
||||||
|
if condition {
|
||||||
|
if let Some(amount) = options.amount {
|
||||||
|
for _ in 0..amount {
|
||||||
|
run_one(&mut wrapper, migrations())?
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
run_all(&mut wrapper, migrations())?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
if options.revert {
|
||||||
|
if let Some(amount) = options.amount {
|
||||||
|
for _ in 0..amount {
|
||||||
|
wrapper.revert_last_migration(migrations())?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wrapper.revert_all_migrations(migrations())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.run {
|
||||||
|
if let Some(amount) = options.amount {
|
||||||
|
for _ in 0..amount {
|
||||||
|
wrapper.run_next_migration(migrations())?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wrapper.run_pending_migrations(migrations())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* } else {
|
||||||
|
wrapper.run_pending_migrations(migrations())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(amount) = options.revert_amount {
|
||||||
|
for _ in 0..amount {
|
||||||
|
wrapper.revert_last_migration(migrations())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.redo_after_revert {
|
||||||
|
for _ in 0..amount {
|
||||||
|
wrapper.run_next_migration(migrations())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wrapper.revert_all_migrations(migrations())?;
|
||||||
|
|
||||||
|
if options.redo_after_revert {
|
||||||
|
wrapper.run_pending_migrations(migrations())?;
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prevent changes from being committed after `lock_conn` unexpectedly closes
|
||||||
|
fn rollback_if_lock_conn_broke<T, E>(
|
||||||
|
conn: &mut PgConnection,
|
||||||
|
lock_conn: &mut PgConnection,
|
||||||
|
mut f: impl FnMut(&mut PgConnection) -> Result<T, E>,
|
||||||
|
) -> Result<T, E>
|
||||||
|
where
|
||||||
|
E: From<anyhow::Error> + From<diesel::result::Error>,
|
||||||
|
{
|
||||||
|
conn.transaction::<T, E, _>(|conn| {
|
||||||
|
let result = f(conn)?;
|
||||||
|
|
||||||
|
select(true.into_sql::<sql_types::Bool>())
|
||||||
|
.execute(lock_conn)
|
||||||
|
.context("Connection used for lock unexpectedly stopped working")?;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Makes `diesel::migration::Result` work with `anyhow` and `LemmyError`
|
||||||
|
fn convert_err(
|
||||||
|
err: Box<dyn std::error::Error + Send + Sync>,
|
||||||
|
//) -> impl std::error::Error + Send + Sync + 'static {
|
||||||
|
) -> anyhow::Error {
|
||||||
|
anyhow::anyhow!(err)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use diesel::{PgConnection, RunQueryDsl};
|
|
||||||
use lemmy_utils::settings::SETTINGS;
|
use lemmy_utils::settings::SETTINGS;
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
|
@ -7,22 +6,22 @@ use std::{
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
};
|
};
|
||||||
|
|
||||||
diesel::sql_function! {
|
// It's not possible to call `export_snapshot()` for each dump and run the dumps in parallel with the
|
||||||
fn pg_export_snapshot() -> diesel::sql_types::Text;
|
// `--snapshot` flag. Don't waste your time!!!
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_dump(conn: &mut PgConnection) -> String {
|
pub fn get_dump() -> String {
|
||||||
/*// Required for pg_dump to see uncommitted changes from a different database connection
|
|
||||||
|
|
||||||
// The pg_dump command runs independently from `conn`, which means it can't see changes from
|
|
||||||
// an uncommitted transaction. NASA made each migration run in a separate transaction. When
|
|
||||||
// it was discovered that
|
|
||||||
let snapshot = diesel::select(pg_export_snapshot())
|
|
||||||
.get_result::<String>(conn)
|
|
||||||
.expect("pg_export_snapshot failed");
|
|
||||||
let snapshot_arg = format!("--snapshot={snapshot}");*/
|
|
||||||
let output = Command::new("pg_dump")
|
let output = Command::new("pg_dump")
|
||||||
.args(["--schema-only", "--no-owner", "--no-privileges", "--no-comments", "--no-publications", "--no-security-labels", "--no-subscriptions", "--no-table-access-method", "--no-tablespaces"])
|
.args([
|
||||||
|
"--schema-only",
|
||||||
|
"--no-owner",
|
||||||
|
"--no-privileges",
|
||||||
|
"--no-comments",
|
||||||
|
"--no-publications",
|
||||||
|
"--no-security-labels",
|
||||||
|
"--no-subscriptions",
|
||||||
|
"--no-table-access-method",
|
||||||
|
"--no-tablespaces",
|
||||||
|
])
|
||||||
.env("DATABASE_URL", SETTINGS.get_database_url())
|
.env("DATABASE_URL", SETTINGS.get_database_url())
|
||||||
.stderr(Stdio::inherit())
|
.stderr(Stdio::inherit())
|
||||||
.output()
|
.output()
|
||||||
|
@ -36,17 +35,19 @@ pub fn get_dump(conn: &mut PgConnection) -> String {
|
||||||
|
|
||||||
const PATTERN_LEN: usize = 19;
|
const PATTERN_LEN: usize = 19;
|
||||||
|
|
||||||
// TODO add unit test for output
|
pub fn check_dump_diff(mut after: String, mut before: String, label: &str) {
|
||||||
pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) {
|
|
||||||
let mut after = get_dump(conn);
|
|
||||||
if after == before {
|
if after == before {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Ignore timestamp differences by removing timestamps
|
// Ignore timestamp differences by removing timestamps
|
||||||
for dump in [&mut before, &mut after] {
|
for dump in [&mut before, &mut after] {
|
||||||
for index in 0.. {
|
for index in 0.. {
|
||||||
let Some(byte)=dump.as_bytes().get(index) else{break};
|
let Some(byte) = dump.as_bytes().get(index) else {
|
||||||
if !byte.is_ascii_digit() {continue;}
|
break;
|
||||||
|
};
|
||||||
|
if !byte.is_ascii_digit() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Check for this pattern: 0000-00-00 00:00:00
|
// Check for this pattern: 0000-00-00 00:00:00
|
||||||
let Some((
|
let Some((
|
||||||
&[a0, a1, a2, a3, b0, a4, a5, b1, a6, a7, b2, a8, a9, b3, a10, a11, b4, a12, a13],
|
&[a0, a1, a2, a3, b0, a4, a5, b1, a6, a7, b2, a8, a9, b3, a10, a11, b4, a12, a13],
|
||||||
|
@ -80,15 +81,20 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str)
|
||||||
|
|
||||||
let [before_chunks, after_chunks] =
|
let [before_chunks, after_chunks] =
|
||||||
[&before, &after].map(|dump| chunks(dump).collect::<BTreeSet<_>>());
|
[&before, &after].map(|dump| chunks(dump).collect::<BTreeSet<_>>());
|
||||||
let only_b = before_chunks.difference(&after_chunks).copied().map(process_chunk).collect::<BTreeSet<_>>();
|
let only_b = before_chunks
|
||||||
let only_a = after_chunks.difference(&before_chunks).copied().map(process_chunk).collect::<BTreeSet<_>>();
|
.difference(&after_chunks)
|
||||||
|
.copied()
|
||||||
|
.map(process_chunk)
|
||||||
|
.collect::<BTreeSet<_>>();
|
||||||
|
let only_a = after_chunks
|
||||||
|
.difference(&before_chunks)
|
||||||
|
.copied()
|
||||||
|
.map(process_chunk)
|
||||||
|
.collect::<BTreeSet<_>>();
|
||||||
|
|
||||||
// todo dont collect only_in_before?
|
// todo dont collect only_in_before?
|
||||||
let [mut only_in_before, mut only_in_after] = [
|
let [mut only_in_before, mut only_in_after] =
|
||||||
only_b.difference(&only_a),
|
[only_b.difference(&only_a), only_a.difference(&only_b)].map(|chunks| {
|
||||||
only_a.difference(&only_b),
|
|
||||||
]
|
|
||||||
.map(|chunks| {
|
|
||||||
chunks
|
chunks
|
||||||
.map(|chunk| {
|
.map(|chunk| {
|
||||||
(
|
(
|
||||||
|
@ -103,45 +109,56 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str)
|
||||||
if only_in_before.is_empty() && only_in_after.is_empty() {
|
if only_in_before.is_empty() && only_in_after.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let after_has_more =
|
let after_has_more = only_in_before.len() < only_in_after.len();
|
||||||
only_in_before.len() < only_in_after.len();
|
|
||||||
// outer iterator in the loop below should not be the one with empty strings, otherwise the empty strings
|
// outer iterator in the loop below should not be the one with empty strings, otherwise the empty strings
|
||||||
// would be equally similar to any other chunk
|
// would be equally similar to any other chunk
|
||||||
let (chunks_gt, chunks_lt) = if after_has_more
|
let (chunks_gt, chunks_lt) = if after_has_more {
|
||||||
{
|
only_in_before.resize_with(only_in_after.len(), Default::default);
|
||||||
only_in_before.resize_with(only_in_after.len(),Default::default);
|
|
||||||
(&mut only_in_after, &only_in_before)
|
(&mut only_in_after, &only_in_before)
|
||||||
} else {
|
} else {
|
||||||
only_in_after.resize_with(only_in_before.len(),Default::default);
|
only_in_after.resize_with(only_in_before.len(), Default::default);
|
||||||
(&mut only_in_before, &only_in_after)
|
(&mut only_in_before, &only_in_after)
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut output = format!("These changes need to be applied in {name}:");
|
let mut output = label.to_owned();
|
||||||
// todo rename variables
|
// todo rename variables
|
||||||
for (before_chunk, before_chunk_filtered) in chunks_lt {
|
for (before_chunk, before_chunk_filtered) in chunks_lt {
|
||||||
let default = Default::default();
|
let default = Default::default();
|
||||||
//panic!("{:?}",(before_chunk.clone(),chunks_lt.clone()));
|
//panic!("{:?}",(before_chunk.clone(),chunks_lt.clone()));
|
||||||
let (most_similar_chunk_index, (most_similar_chunk, most_similar_chunk_filtered)) = chunks_gt
|
let (most_similar_chunk_index, (most_similar_chunk, _)) = chunks_gt
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.max_by_key(|(_, (after_chunk, after_chunk_filtered))| {
|
.max_by_key(|(_, (after_chunk, after_chunk_filtered))| {
|
||||||
if
|
if after_chunk
|
||||||
after_chunk.split_once(|c:char|c.is_lowercase()).unwrap_or_default().0 !=
|
.split_once(|c: char| c.is_lowercase())
|
||||||
before_chunk.split_once(|c:char|c.is_lowercase()).unwrap_or_default().0 {0}else{
|
.unwrap_or_default()
|
||||||
|
.0
|
||||||
|
!= before_chunk
|
||||||
|
.split_once(|c: char| c.is_lowercase())
|
||||||
|
.unwrap_or_default()
|
||||||
|
.0
|
||||||
|
{
|
||||||
|
0
|
||||||
|
} else {
|
||||||
diff::chars(after_chunk_filtered, &before_chunk_filtered)
|
diff::chars(after_chunk_filtered, &before_chunk_filtered)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|i| matches!(i, diff::Result::Both(c, _)
|
.filter(|i| {
|
||||||
|
matches!(i, diff::Result::Both(c, _)
|
||||||
// `is_lowercase` increases accuracy for some trigger function diffs
|
// `is_lowercase` increases accuracy for some trigger function diffs
|
||||||
if c.is_lowercase() || c.is_numeric()))
|
if c.is_lowercase() || c.is_numeric())
|
||||||
.count()}
|
|
||||||
})
|
})
|
||||||
.unwrap_or((0,&default));
|
.count()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or((0, &default));
|
||||||
|
|
||||||
output.push('\n');
|
output.push('\n');
|
||||||
let lines = if !after_has_more{diff::lines(&before_chunk,most_similar_chunk)}else{
|
let lines = if !after_has_more {
|
||||||
diff::lines(most_similar_chunk, &before_chunk)};
|
diff::lines(&before_chunk, most_similar_chunk)
|
||||||
for line in lines
|
} else {
|
||||||
{
|
diff::lines(most_similar_chunk, &before_chunk)
|
||||||
|
};
|
||||||
|
for line in lines {
|
||||||
match line {
|
match line {
|
||||||
diff::Result::Left(s) => write!(&mut output, "\n- {s}"),
|
diff::Result::Left(s) => write!(&mut output, "\n- {s}"),
|
||||||
diff::Result::Right(s) => write!(&mut output, "\n+ {s}"),
|
diff::Result::Right(s) => write!(&mut output, "\n+ {s}"),
|
||||||
|
@ -151,7 +168,8 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str)
|
||||||
}
|
}
|
||||||
//write!(&mut output, "\n{most_similar_chunk_filtered}");
|
//write!(&mut output, "\n{most_similar_chunk_filtered}");
|
||||||
if !chunks_gt.is_empty() {
|
if !chunks_gt.is_empty() {
|
||||||
chunks_gt.swap_remove(most_similar_chunk_index);}
|
chunks_gt.swap_remove(most_similar_chunk_index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// should have all been removed
|
// should have all been removed
|
||||||
assert_eq!(chunks_gt.len(), 0);
|
assert_eq!(chunks_gt.len(), 0);
|
||||||
|
@ -177,17 +195,14 @@ fn process_chunk<'a>(result: &'a str) -> Cow<'a, str> {
|
||||||
Cow::Owned(lines.join("\n"))
|
Cow::Owned(lines.join("\n"))
|
||||||
} else if result.starts_with("CREATE VIEW") || result.starts_with("CREATE OR REPLACE VIEW") {
|
} else if result.starts_with("CREATE VIEW") || result.starts_with("CREATE OR REPLACE VIEW") {
|
||||||
// Allow column order to change
|
// Allow column order to change
|
||||||
let is_simple_select_statement = result
|
let is_simple_select_statement = result.lines().enumerate().all(|(i, mut line)| {
|
||||||
.lines()
|
|
||||||
.enumerate()
|
|
||||||
.all(|(i, mut line)| {
|
|
||||||
line = line.trim_start();
|
line = line.trim_start();
|
||||||
match (i, line.chars().next()) {
|
match (i, line.chars().next()) {
|
||||||
(0, Some('C')) => true, // create
|
(0, Some('C')) => true, // create
|
||||||
(1, Some('S')) => true, // select
|
(1, Some('S')) => true, // select
|
||||||
(_, Some('F')) if line.ends_with(';') => true, // from
|
(_, Some('F')) if line.ends_with(';') => true, // from
|
||||||
(_, Some(c)) if c.is_lowercase() => true, // column name
|
(_, Some(c)) if c.is_lowercase() => true, // column name
|
||||||
_ => false
|
_ => false,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if is_simple_select_statement {
|
if is_simple_select_statement {
|
||||||
|
@ -205,7 +220,9 @@ fn process_chunk<'a>(result: &'a str) -> Cow<'a, str> {
|
||||||
(placement, line)
|
(placement, line)
|
||||||
});
|
});
|
||||||
Cow::Owned(lines.join("\n"))
|
Cow::Owned(lines.join("\n"))
|
||||||
}else{Cow::Borrowed(result)}
|
} else {
|
||||||
|
Cow::Borrowed(result)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
Cow::Borrowed(result)
|
Cow::Borrowed(result)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue