use advisory lock

This commit is contained in:
Dull Bananas 2024-05-21 20:19:34 +00:00
parent cf5afc9fb3
commit 4d427b2f0c

View file

@ -60,22 +60,19 @@ fn replaceable_schema() -> String {
const REPLACEABLE_SCHEMA_PATH: &str = "crates/db_schema/replaceable_schema"; const REPLACEABLE_SCHEMA_PATH: &str = "crates/db_schema/replaceable_schema";
struct MigrationHarnessWrapper<'a, 'b, 'c> { struct MigrationHarnessWrapper<'a, 'b> {
conn: &'a mut PgConnection, conn: &'a mut PgConnection,
lock_conn: &'b mut PgConnection, options: &'b Options,
options: &'c Options,
} }
impl<'a, 'b, 'c> MigrationHarnessWrapper<'a, 'b, 'c> { impl<'a, 'b> MigrationHarnessWrapper<'a, 'b> {
fn run_migration_inner( fn run_migration_inner(
&mut self, &mut self,
migration: &dyn Migration<Pg>, migration: &dyn Migration<Pg>,
) -> diesel::migration::Result<MigrationVersion<'static>> { ) -> diesel::migration::Result<MigrationVersion<'static>> {
let start_time = Instant::now(); let start_time = Instant::now();
let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| { let result = self.conn.run_migration(migration);
conn.run_migration(migration)
});
let duration = start_time.elapsed().as_millis(); let duration = start_time.elapsed().as_millis();
let name = migration.name(); let name = migration.name();
@ -85,7 +82,7 @@ impl<'a, 'b, 'c> MigrationHarnessWrapper<'a, 'b, 'c> {
} }
} }
impl<'a, 'b, 'c> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b, 'c> { impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
fn run_migration( fn run_migration(
&mut self, &mut self,
migration: &dyn Migration<Pg>, migration: &dyn Migration<Pg>,
@ -116,15 +113,9 @@ impl<'a, 'b, 'c> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b, 'c> {
&mut self, &mut self,
migration: &dyn Migration<Pg>, migration: &dyn Migration<Pg>,
) -> diesel::migration::Result<MigrationVersion<'static>> { ) -> diesel::migration::Result<MigrationVersion<'static>> {
if self.options.enable_diff_check {
//unimplemented!("diff check when reverting migrations");
}
let start_time = Instant::now(); let start_time = Instant::now();
let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| { let result = self.conn.revert_migration(migration);
conn.revert_migration(migration)
});
let duration = start_time.elapsed().as_millis(); let duration = start_time.elapsed().as_millis();
let name = migration.name(); let name = migration.name();
@ -221,55 +212,46 @@ pub fn run(options: Options) -> LemmyResult<()> {
conn.batch_execute("SET lemmy.enable_migrations TO 'on';")?; conn.batch_execute("SET lemmy.enable_migrations TO 'on';")?;
} }
// Repurpose the table created by `has_pending_migration` for locking, which // Block concurrent attempts to run migrations (lock is automatically released at the end
// blocks concurrent attempts to run migrations, but not normal use of the table // because the connection is not reused)
// Using the same connection for both the lock and the migrations would require
// running all migrations in the same transaction, which would prevent:
// * Diff checker using pg_dump to see the effects of each migration
// * Migrations using an enum value added in a previous migration in the same transaction
// * Inspection of the database schema after only some migrations succeed
PgConnection::establish(&db_url)?.transaction(|lock_conn| -> LemmyResult<()> {
info!("Waiting for lock..."); info!("Waiting for lock...");
lock_conn.batch_execute("LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;")?; conn.batch_execute("SELECT pg_advisory_lock(0)")?;
info!("Running Database migrations (This may take a long time)..."); info!("Running Database migrations (This may take a long time)...");
// Drop `r` schema, so migrations don't need to be made to work both with and without things in // Drop `r` schema, so migrations don't need to be made to work both with and without things in
// it existing // it existing
revert_replaceable_schema(&mut conn, lock_conn)?; revert_replaceable_schema(&mut conn)?;
run_selected_migrations(&mut conn, lock_conn, &options).map_err(convert_err)?; run_selected_migrations(&mut conn, &options).map_err(convert_err)?;
// Only run replaceable_schema if newest migration was applied // Only run replaceable_schema if newest migration was applied
if (options.run && options.amount.is_none()) || !conn.has_pending_migration(migrations()).map_err(convert_err)? { if (options.run && options.amount.is_none())
|| !conn
.has_pending_migration(migrations())
.map_err(convert_err)?
{
#[cfg(test)] #[cfg(test)]
if options.enable_diff_check { if options.enable_diff_check {
let before = diff_check::get_dump(); let before = diff_check::get_dump();
run_replaceable_schema(&mut conn, lock_conn)?; run_replaceable_schema(&mut conn)?;
revert_replaceable_schema(&mut conn, lock_conn)?; revert_replaceable_schema(&mut conn)?;
let after = diff_check::get_dump(); let after = diff_check::get_dump();
diff_check::check_dump_diff(before, after, "The code in crates/db_schema/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:"); diff_check::check_dump_diff(before, after, "The code in crates/db_schema/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:");
} }
run_replaceable_schema(&mut conn, lock_conn)?; run_replaceable_schema(&mut conn)?;
} }
Ok(())
})?;
info!("Database migrations complete."); info!("Database migrations complete.");
Ok(()) Ok(())
} }
fn run_replaceable_schema( fn run_replaceable_schema(conn: &mut PgConnection) -> LemmyResult<()> {
conn: &mut PgConnection, conn.transaction(|conn| {
lock_conn: &mut PgConnection,
) -> LemmyResult<()> {
rollback_if_lock_conn_broke(conn, lock_conn, |conn| {
conn conn
.batch_execute(&replaceable_schema()) .batch_execute(&replaceable_schema())
.with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; .with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?;
@ -284,54 +266,22 @@ fn run_replaceable_schema(
}) })
} }
fn revert_replaceable_schema( fn revert_replaceable_schema(conn: &mut PgConnection) -> LemmyResult<()> {
conn: &mut PgConnection,
lock_conn: &mut PgConnection,
) -> LemmyResult<()> {
rollback_if_lock_conn_broke(conn, lock_conn, |conn| {
conn conn
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;") .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")
.with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; .with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?;
// Value in `previously_run_sql` table is not set here because the table might not exist // Value in `previously_run_sql` table is not set here because the table might not exist,
// and that's fine because the existence of the `r` schema is also checked
Ok(()) Ok(())
})
} }
fn run_selected_migrations( fn run_selected_migrations(
conn: &mut PgConnection, conn: &mut PgConnection,
lock_conn: &mut PgConnection,
options: &Options, options: &Options,
) -> diesel::migration::Result<()> { ) -> diesel::migration::Result<()> {
let mut wrapper = MigrationHarnessWrapper { let mut wrapper = MigrationHarnessWrapper { conn, options };
conn,
lock_conn,
options,
};
/*let revert = (
options.revert,
MigrationHarnessWrapper::revert_last_migration as fn(_, _) -> _,
MigrationHarnessWrapper::revert_all_migrations as fn(_, _) -> _,
);
let run = (
options.run,
MigrationHarnessWrapper::run_next_migration,
MigrationHarnessWrapper::run_pending_migrations,
);
for (condition, run_one, run_all) in [revert, run] {
if condition {
if let Some(amount) = options.amount {
for _ in 0..amount {
run_one(&mut wrapper, migrations())?
}
} else {
run_all(&mut wrapper, migrations())?
}
}
}*/
if options.revert { if options.revert {
if let Some(amount) = options.amount { if let Some(amount) = options.amount {
@ -353,51 +303,9 @@ fn run_selected_migrations(
} }
} }
/* } else {
wrapper.run_pending_migrations(migrations())?;
}
if let Some(amount) = options.revert_amount {
for _ in 0..amount {
wrapper.revert_last_migration(migrations())?;
}
if options.redo_after_revert {
for _ in 0..amount {
wrapper.run_next_migration(migrations())?;
}
}
} else {
wrapper.revert_all_migrations(migrations())?;
if options.redo_after_revert {
wrapper.run_pending_migrations(migrations())?;
}
}*/
Ok(()) Ok(())
} }
/// Prevent changes from being committed after `lock_conn` unexpectedly closes
fn rollback_if_lock_conn_broke<T, E>(
conn: &mut PgConnection,
lock_conn: &mut PgConnection,
mut f: impl FnMut(&mut PgConnection) -> Result<T, E>,
) -> Result<T, E>
where
E: From<anyhow::Error> + From<diesel::result::Error>,
{
conn.transaction::<T, E, _>(|conn| {
let result = f(conn)?;
select(true.into_sql::<sql_types::Bool>())
.execute(lock_conn)
.context("Connection used for lock unexpectedly stopped working")?;
Ok(result)
})
}
/// Makes `diesel::migration::Result` work with `anyhow` and `LemmyError` /// Makes `diesel::migration::Result` work with `anyhow` and `LemmyError`
fn convert_err( fn convert_err(
err: Box<dyn std::error::Error + Send + Sync>, err: Box<dyn std::error::Error + Send + Sync>,