diff --git a/crates/db_schema/src/schema_setup.rs b/crates/db_schema/src/schema_setup.rs index 573c9195b..12bd7b4eb 100644 --- a/crates/db_schema/src/schema_setup.rs +++ b/crates/db_schema/src/schema_setup.rs @@ -5,10 +5,14 @@ use crate::schema::previously_run_sql; use anyhow::{anyhow, Context}; use diesel::{ connection::SimpleConnection, + dsl::exists, + expression::IntoSql, migration::{Migration, MigrationSource, MigrationVersion}, pg::Pg, select, + sql_types, update, + BoolExpressionMethods, Connection, ExpressionMethods, NullableExpressionMethods, @@ -21,9 +25,15 @@ use lemmy_utils::{error::LemmyResult, settings::SETTINGS}; use std::time::Instant; use tracing::info; +diesel::table! { + pg_namespace (nspname) { + nspname -> Text, + } +} + // In production, include migrations in the binary #[cfg(not(debug_assertions))] -fn get_migration_source() -> diesel_migrations::EmbeddedMigrations { +fn migrations() -> diesel_migrations::EmbeddedMigrations { // Using `const` here is required by the borrow checker const MIGRATIONS: diesel_migrations::EmbeddedMigrations = diesel_migrations::embed_migrations!(); MIGRATIONS @@ -31,52 +41,75 @@ fn get_migration_source() -> diesel_migrations::EmbeddedMigrations { // Avoid recompiling when migrations are changed #[cfg(debug_assertions)] -fn get_migration_source() -> diesel_migrations::FileBasedMigrations { +fn migrations() -> diesel_migrations::FileBasedMigrations { diesel_migrations::FileBasedMigrations::find_migrations_directory() - .expect("failed to find migrations dir") + .expect("failed to get migration source") } /// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced /// instead of being changed using migrations. It may not create or modify things outside of the `r` schema /// (indicated by `r.` before the name), unless a comment says otherwise. -const REPLACEABLE_SCHEMA: &[&str] = &[ - "CREATE SCHEMA r;", - include_str!("../replaceable_schema/utils.sql"), - include_str!("../replaceable_schema/triggers.sql"), -]; - -struct MigrationHarnessWrapper<'a, 'b> { - conn: &'a mut PgConnection, - options: &'b Options, +fn replaceable_schema() -> String { + [ + "CREATE SCHEMA r;", + include_str!("../replaceable_schema/utils.sql"), + include_str!("../replaceable_schema/triggers.sql"), + ] + .join("\n") } -impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { +const REPLACEABLE_SCHEMA_PATH: &str = "crates/db_schema/replaceable_schema"; + +struct MigrationHarnessWrapper<'a, 'b, 'c> { + conn: &'a mut PgConnection, + lock_conn: &'b mut PgConnection, + options: &'c Options, +} + +impl<'a, 'b, 'c> MigrationHarnessWrapper<'a, 'b, 'c> { + fn run_migration_inner( + &mut self, + migration: &dyn Migration, + ) -> diesel::migration::Result> { + let start_time = Instant::now(); + + let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| { + conn.run_migration(migration) + }); + + let duration = start_time.elapsed().as_millis(); + let name = migration.name(); + info!("{duration}ms run {name}"); + + result + } +} + +impl<'a, 'b, 'c> MigrationHarness for MigrationHarnessWrapper<'a, 'b, 'c> { fn run_migration( &mut self, migration: &dyn Migration, ) -> diesel::migration::Result> { - let name = migration.name(); - #[cfg(test)] if self.options.enable_diff_check { - let before = diff_check::get_dump(&mut self.conn); - self.conn.run_migration(migration)?; - self.conn.revert_migration(migration)?; + let before = diff_check::get_dump(); + + self.run_migration_inner(migration)?; + self.revert_migration(migration)?; + + let after = diff_check::get_dump(); + diff_check::check_dump_diff( - &mut self.conn, + after, before, - &format!("migrations/{name}/down.sql"), + &format!( + "These changes need to be applied in migrations/{}/down.sql:", + migration.name() + ), ); } - let start_time = Instant::now(); - - let result = self.conn.run_migration(migration); - - let duration = start_time.elapsed().as_millis(); - info!("{duration}ms run {name}"); - - result + self.run_migration_inner(migration) } fn revert_migration( @@ -84,12 +117,14 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { migration: &dyn Migration, ) -> diesel::migration::Result> { if self.options.enable_diff_check { - unimplemented!("diff check when reverting migrations"); + //unimplemented!("diff check when reverting migrations"); } let start_time = Instant::now(); - let result = self.conn.revert_migration(migration); + let result = rollback_if_lock_conn_broke(&mut self.conn, &mut self.lock_conn, |conn| { + conn.revert_migration(migration) + }); let duration = start_time.elapsed().as_millis(); let name = migration.name(); @@ -103,26 +138,24 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { } } -// TODO: remove when diesel either adds MigrationSource impl for references or changes functions to take reference -#[derive(Clone, Copy)] -struct MigrationSourceRef( - // If this was `&T`, then the derive macros would add `Clone` and `Copy` bounds for `T` - T, -); - -impl<'a, T: MigrationSource> MigrationSource for MigrationSourceRef<&'a T> { - fn migrations(&self) -> diesel::migration::Result>>> { - self.0.migrations() - } -} - -#[derive(Default)] pub struct Options { enable_forbid_diesel_cli_trigger: bool, enable_diff_check: bool, revert: bool, - revert_amount: Option, - redo_after_revert: bool, + run: bool, + amount: Option, +} + +impl Default for Options { + fn default() -> Self { + Options { + enable_forbid_diesel_cli_trigger: false, + enable_diff_check: false, + revert: false, + run: true, + amount: None, + } + } } impl Options { @@ -140,13 +173,16 @@ impl Options { pub fn revert(mut self, amount: Option) -> Self { self.revert = true; - self.revert_amount = amount; + self.run = false; + self.amount = amount; self } pub fn redo(mut self, amount: Option) -> Self { - self.redo_after_revert = true; - self.revert(amount) + self.revert = true; + self.run = true; + self.amount = amount; + self } } @@ -155,32 +191,27 @@ pub fn run(options: Options) -> LemmyResult<()> { let db_url = SETTINGS.get_database_url(); // Migrations don't support async connection, and this function doesn't need to be async - let mut conn = - PgConnection::establish(&db_url).with_context(|| "Error connecting to database")?; - - let new_sql = REPLACEABLE_SCHEMA.join("\n"); - - let migration_source = get_migration_source(); - let migration_source_ref = MigrationSourceRef(&migration_source); + //let mut conn = PgConnection::establish(&db_url).context("Error connecting to database")?; + let mut conn = PgConnection::establish(&db_url)?; // If possible, skip locking the migrations table and recreating the "r" schema, so // lemmy_server processes in a horizontally scaled setup can start without causing locks - if !(options.revert - || conn - .has_pending_migration(migration_source_ref) - .map_err(|e| anyhow!("Couldn't check pending migrations: {e}"))?) + if !options.revert + && options.run + && options.amount.is_none() + && !conn + .has_pending_migration(migrations()) + .map_err(convert_err)? + //.map_err(|e| anyhow!("Couldn't check pending migrations: {e}"))?) { // The condition above implies that the migration that creates the previously_run_sql table was already run - let sql_unchanged: bool = select( - previously_run_sql::table - .select(previously_run_sql::content) - .single_value() - .assume_not_null() - .eq(&new_sql), - ) - .get_result(&mut conn)?; + let sql_unchanged = exists( + previously_run_sql::table.filter(previously_run_sql::content.eq(replaceable_schema())), + ); - if sql_unchanged { + let schema_exists = exists(pg_namespace::table.find("r")); + + if select(sql_unchanged.and(schema_exists)).get_result(&mut conn)? { return Ok(()); } } @@ -190,103 +221,191 @@ pub fn run(options: Options) -> LemmyResult<()> { conn.batch_execute("SET lemmy.enable_migrations TO 'on';")?; } - // Running without transaction allows pg_dump to see results of migrations - // TODO never use 1 transaction - let run_in_transaction = !options.enable_diff_check; + // Repurpose the table created by `has_pending_migration` for locking, which + // blocks concurrent attempts to run migrations, but not normal use of the table - let transaction = |conn: &mut PgConnection| -> LemmyResult<()> { - let mut wrapper = MigrationHarnessWrapper { - conn, - options: &options, - }; - - // * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing - // the table created by `MigrationHarness::pending_migrations` as a lock target (this doesn't block - // normal use of the table) - // * Drop `r` schema, so migrations don't need to be made to work both with and without things in - // it existing + // Using the same connection for both the lock and the migrations would require + // running all migrations in the same transaction, which would prevent: + // * Diff checker using pg_dump to see the effects of each migration + // * Migrations using an enum value added in a previous migration in the same transaction + // * Inspection of the database schema after only some migrations succeed + PgConnection::establish(&db_url)?.transaction(|lock_conn| -> LemmyResult<()> { info!("Waiting for lock..."); - - let lock = if run_in_transaction { - "LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;" - } else { - "" - }; - - wrapper - .conn - .batch_execute(&format!("{lock}DROP SCHEMA IF EXISTS r CASCADE;"))?; - + lock_conn.batch_execute("LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;")?; info!("Running Database migrations (This may take a long time)..."); - (|| { - if options.revert { - if let Some(amount) = options.revert_amount { - for _ in 0..amount { - wrapper.revert_last_migration(migration_source_ref)?; - } - if options.redo_after_revert { - for _ in 0..amount { - wrapper.run_next_migration(migration_source_ref)?; - } - } - } else { - wrapper.revert_all_migrations(migration_source_ref)?; - if options.redo_after_revert { - wrapper.run_pending_migrations(migration_source_ref)?; - } - } - } else { - wrapper.run_pending_migrations(migration_source_ref)?; - } - diesel::migration::Result::Ok(()) - })() - .map_err(|e| anyhow!("Couldn't run DB Migrations: {e}"))?; + // Drop `r` schema, so migrations don't need to be made to work both with and without things in + // it existing + revert_replaceable_schema(&mut conn, lock_conn)?; - // Run replaceable_schema if newest migration was applied - if !(options.revert && !options.redo_after_revert) { + run_selected_migrations(&mut conn, lock_conn, &options).map_err(convert_err)?; + + // Only run replaceable_schema if newest migration was applied + if (options.run && options.amount.is_none()) || !conn.has_pending_migration(migrations()).map_err(convert_err)? { #[cfg(test)] if options.enable_diff_check { - let before = diff_check::get_dump(&mut wrapper.conn); - // todo move replaceable_schema dir path to let/const? - wrapper - .conn - .batch_execute(&new_sql) - .context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?; - // todo move statement to const - wrapper - .conn - .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?; - // todo use different first output line in this case - diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema"); + let before = diff_check::get_dump(); + + run_replaceable_schema(&mut conn, lock_conn)?; + revert_replaceable_schema(&mut conn, lock_conn)?; + + let after = diff_check::get_dump(); + + diff_check::check_dump_diff(before, after, "The code in crates/db_schema/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:"); } - wrapper - .conn - .batch_execute(&new_sql) - .context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?; - - let num_rows_updated = update(previously_run_sql::table) - .set(previously_run_sql::content.eq(new_sql)) - .execute(wrapper.conn)?; - - debug_assert_eq!(num_rows_updated, 1); + run_replaceable_schema(&mut conn, lock_conn)?; } Ok(()) - }; - - if run_in_transaction { - conn.transaction(transaction)?; - } else { - transaction(&mut conn)?; - } + })?; info!("Database migrations complete."); Ok(()) } +fn run_replaceable_schema( + conn: &mut PgConnection, + lock_conn: &mut PgConnection, +) -> LemmyResult<()> { + rollback_if_lock_conn_broke(conn, lock_conn, |conn| { + conn + .batch_execute(&replaceable_schema()) + .with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; + + let num_rows_updated = update(previously_run_sql::table) + .set(previously_run_sql::content.eq(replaceable_schema())) + .execute(conn)?; + + debug_assert_eq!(num_rows_updated, 1); + + Ok(()) + }) +} + +fn revert_replaceable_schema( + conn: &mut PgConnection, + lock_conn: &mut PgConnection, +) -> LemmyResult<()> { + rollback_if_lock_conn_broke(conn, lock_conn, |conn| { + conn + .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;") + .with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; + + // Value in `previously_run_sql` table is not set here because the table might not exist + + Ok(()) + }) +} + +fn run_selected_migrations( + conn: &mut PgConnection, + lock_conn: &mut PgConnection, + options: &Options, +) -> diesel::migration::Result<()> { + let mut wrapper = MigrationHarnessWrapper { + conn, + lock_conn, + options, + }; + + /*let revert = ( + options.revert, + MigrationHarnessWrapper::revert_last_migration as fn(_, _) -> _, + MigrationHarnessWrapper::revert_all_migrations as fn(_, _) -> _, + ); + let run = ( + options.run, + MigrationHarnessWrapper::run_next_migration, + MigrationHarnessWrapper::run_pending_migrations, + ); + + for (condition, run_one, run_all) in [revert, run] { + if condition { + if let Some(amount) = options.amount { + for _ in 0..amount { + run_one(&mut wrapper, migrations())? + } + } else { + run_all(&mut wrapper, migrations())? + } + } + }*/ + + if options.revert { + if let Some(amount) = options.amount { + for _ in 0..amount { + wrapper.revert_last_migration(migrations())?; + } + } else { + wrapper.revert_all_migrations(migrations())?; + } + } + + if options.run { + if let Some(amount) = options.amount { + for _ in 0..amount { + wrapper.run_next_migration(migrations())?; + } + } else { + wrapper.run_pending_migrations(migrations())?; + } + } + + /* } else { + wrapper.run_pending_migrations(migrations())?; + } + + if let Some(amount) = options.revert_amount { + for _ in 0..amount { + wrapper.revert_last_migration(migrations())?; + } + + if options.redo_after_revert { + for _ in 0..amount { + wrapper.run_next_migration(migrations())?; + } + } + } else { + wrapper.revert_all_migrations(migrations())?; + + if options.redo_after_revert { + wrapper.run_pending_migrations(migrations())?; + } + }*/ + + Ok(()) +} + +/// Prevent changes from being committed after `lock_conn` unexpectedly closes +fn rollback_if_lock_conn_broke( + conn: &mut PgConnection, + lock_conn: &mut PgConnection, + mut f: impl FnMut(&mut PgConnection) -> Result, +) -> Result +where + E: From + From, +{ + conn.transaction::(|conn| { + let result = f(conn)?; + + select(true.into_sql::()) + .execute(lock_conn) + .context("Connection used for lock unexpectedly stopped working")?; + + Ok(result) + }) +} + +/// Makes `diesel::migration::Result` work with `anyhow` and `LemmyError` +fn convert_err( + err: Box, + //) -> impl std::error::Error + Send + Sync + 'static { +) -> anyhow::Error { + anyhow::anyhow!(err) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/db_schema/src/schema_setup/diff_check.rs b/crates/db_schema/src/schema_setup/diff_check.rs index 67de872dd..0528d465b 100644 --- a/crates/db_schema/src/schema_setup/diff_check.rs +++ b/crates/db_schema/src/schema_setup/diff_check.rs @@ -1,4 +1,3 @@ -use diesel::{PgConnection, RunQueryDsl}; use lemmy_utils::settings::SETTINGS; use std::{ borrow::Cow, @@ -7,22 +6,22 @@ use std::{ process::{Command, Stdio}, }; -diesel::sql_function! { - fn pg_export_snapshot() -> diesel::sql_types::Text; -} +// It's not possible to call `export_snapshot()` for each dump and run the dumps in parallel with the +// `--snapshot` flag. Don't waste your time!!! -pub fn get_dump(conn: &mut PgConnection) -> String { - /*// Required for pg_dump to see uncommitted changes from a different database connection - - // The pg_dump command runs independently from `conn`, which means it can't see changes from - // an uncommitted transaction. NASA made each migration run in a separate transaction. When - // it was discovered that - let snapshot = diesel::select(pg_export_snapshot()) - .get_result::(conn) - .expect("pg_export_snapshot failed"); - let snapshot_arg = format!("--snapshot={snapshot}");*/ +pub fn get_dump() -> String { let output = Command::new("pg_dump") - .args(["--schema-only", "--no-owner", "--no-privileges", "--no-comments", "--no-publications", "--no-security-labels", "--no-subscriptions", "--no-table-access-method", "--no-tablespaces"]) + .args([ + "--schema-only", + "--no-owner", + "--no-privileges", + "--no-comments", + "--no-publications", + "--no-security-labels", + "--no-subscriptions", + "--no-table-access-method", + "--no-tablespaces", + ]) .env("DATABASE_URL", SETTINGS.get_database_url()) .stderr(Stdio::inherit()) .output() @@ -36,17 +35,19 @@ pub fn get_dump(conn: &mut PgConnection) -> String { const PATTERN_LEN: usize = 19; -// TODO add unit test for output -pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) { - let mut after = get_dump(conn); +pub fn check_dump_diff(mut after: String, mut before: String, label: &str) { if after == before { return; } // Ignore timestamp differences by removing timestamps for dump in [&mut before, &mut after] { for index in 0.. { - let Some(byte)=dump.as_bytes().get(index) else{break}; - if !byte.is_ascii_digit() {continue;} + let Some(byte) = dump.as_bytes().get(index) else { + break; + }; + if !byte.is_ascii_digit() { + continue; + } // Check for this pattern: 0000-00-00 00:00:00 let Some(( &[a0, a1, a2, a3, b0, a4, a5, b1, a6, a7, b2, a8, a9, b3, a10, a11, b4, a12, a13], @@ -54,16 +55,16 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) )) = dump .get(index..) .and_then(|s| s.as_bytes().split_first_chunk::()) - else { - break; + else { + break; }; if [a0, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13] - .into_iter() - .all(|byte| byte.is_ascii_digit()) + .into_iter() + .all(|byte| byte.is_ascii_digit()) && [b0, b1, b2, b3, b4] == *b"-- ::" - { - // Replace the part of the string that has the checked pattern and an optional fractional part - let len_after = if let Some((b'.', s)) = remaining.split_first() { + { + // Replace the part of the string that has the checked pattern and an optional fractional part + let len_after = if let Some((b'.', s)) = remaining.split_first() { 1 + s.iter().position(|c| !c.is_ascii_digit()).unwrap_or(0) } else { 0 @@ -79,69 +80,85 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) } let [before_chunks, after_chunks] = - [&before, &after].map(|dump| chunks(dump).collect::>()); - let only_b = before_chunks.difference(&after_chunks).copied().map(process_chunk).collect::>(); - let only_a = after_chunks.difference(&before_chunks).copied().map(process_chunk).collect::>(); - + [&before, &after].map(|dump| chunks(dump).collect::>()); + let only_b = before_chunks + .difference(&after_chunks) + .copied() + .map(process_chunk) + .collect::>(); + let only_a = after_chunks + .difference(&before_chunks) + .copied() + .map(process_chunk) + .collect::>(); + // todo dont collect only_in_before? - let [mut only_in_before, mut only_in_after] = [ - only_b.difference(&only_a), - only_a.difference(&only_b), - ] - .map(|chunks| { - chunks - .map(|chunk| { - ( - &**chunk, - // Used for ignoring formatting differences, especially indentation level, when - // determining which item in `only_in_before` corresponds to which item in `only_in_after` - chunk.replace([' ', '\t', '\r', '\n'], ""), - ) - }) - .collect::>() - }); + let [mut only_in_before, mut only_in_after] = + [only_b.difference(&only_a), only_a.difference(&only_b)].map(|chunks| { + chunks + .map(|chunk| { + ( + &**chunk, + // Used for ignoring formatting differences, especially indentation level, when + // determining which item in `only_in_before` corresponds to which item in `only_in_after` + chunk.replace([' ', '\t', '\r', '\n'], ""), + ) + }) + .collect::>() + }); if only_in_before.is_empty() && only_in_after.is_empty() { return; } - let after_has_more = - only_in_before.len() < only_in_after.len(); + let after_has_more = only_in_before.len() < only_in_after.len(); // outer iterator in the loop below should not be the one with empty strings, otherwise the empty strings // would be equally similar to any other chunk - let (chunks_gt, chunks_lt) = if after_has_more - { - only_in_before.resize_with(only_in_after.len(),Default::default); + let (chunks_gt, chunks_lt) = if after_has_more { + only_in_before.resize_with(only_in_after.len(), Default::default); (&mut only_in_after, &only_in_before) } else { - only_in_after.resize_with(only_in_before.len(),Default::default); + only_in_after.resize_with(only_in_before.len(), Default::default); (&mut only_in_before, &only_in_after) }; - let mut output = format!("These changes need to be applied in {name}:"); + let mut output = label.to_owned(); // todo rename variables for (before_chunk, before_chunk_filtered) in chunks_lt { let default = Default::default(); //panic!("{:?}",(before_chunk.clone(),chunks_lt.clone())); - let (most_similar_chunk_index, (most_similar_chunk, most_similar_chunk_filtered)) = chunks_gt + let (most_similar_chunk_index, (most_similar_chunk, _)) = chunks_gt .iter() .enumerate() .max_by_key(|(_, (after_chunk, after_chunk_filtered))| { - if - after_chunk.split_once(|c:char|c.is_lowercase()).unwrap_or_default().0 != - before_chunk.split_once(|c:char|c.is_lowercase()).unwrap_or_default().0 {0}else{ - diff::chars(after_chunk_filtered, &before_chunk_filtered) - .into_iter() - .filter(|i| matches!(i, diff::Result::Both(c, _) + if after_chunk + .split_once(|c: char| c.is_lowercase()) + .unwrap_or_default() + .0 + != before_chunk + .split_once(|c: char| c.is_lowercase()) + .unwrap_or_default() + .0 + { + 0 + } else { + diff::chars(after_chunk_filtered, &before_chunk_filtered) + .into_iter() + .filter(|i| { + matches!(i, diff::Result::Both(c, _) // `is_lowercase` increases accuracy for some trigger function diffs - if c.is_lowercase() || c.is_numeric())) - .count()} + if c.is_lowercase() || c.is_numeric()) + }) + .count() + } }) - .unwrap_or((0,&default)); + .unwrap_or((0, &default)); output.push('\n'); - let lines = if !after_has_more{diff::lines(&before_chunk,most_similar_chunk)}else{ - diff::lines(most_similar_chunk, &before_chunk)}; - for line in lines - { + let lines = if !after_has_more { + diff::lines(&before_chunk, most_similar_chunk) + } else { + diff::lines(most_similar_chunk, &before_chunk) + }; + for line in lines { match line { diff::Result::Left(s) => write!(&mut output, "\n- {s}"), diff::Result::Right(s) => write!(&mut output, "\n+ {s}"), @@ -151,7 +168,8 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) } //write!(&mut output, "\n{most_similar_chunk_filtered}"); if !chunks_gt.is_empty() { - chunks_gt.swap_remove(most_similar_chunk_index);} + chunks_gt.swap_remove(most_similar_chunk_index); + } } // should have all been removed assert_eq!(chunks_gt.len(), 0); @@ -177,19 +195,16 @@ fn process_chunk<'a>(result: &'a str) -> Cow<'a, str> { Cow::Owned(lines.join("\n")) } else if result.starts_with("CREATE VIEW") || result.starts_with("CREATE OR REPLACE VIEW") { // Allow column order to change - let is_simple_select_statement = result - .lines() - .enumerate() - .all(|(i, mut line)| { - line = line.trim_start(); - match (i, line.chars().next()) { - (0, Some('C')) => true, // create - (1, Some('S')) => true, // select - (_, Some('F')) if line.ends_with(';') => true, // from - (_, Some(c)) if c.is_lowercase() => true, // column name - _ => false - } - }); + let is_simple_select_statement = result.lines().enumerate().all(|(i, mut line)| { + line = line.trim_start(); + match (i, line.chars().next()) { + (0, Some('C')) => true, // create + (1, Some('S')) => true, // select + (_, Some('F')) if line.ends_with(';') => true, // from + (_, Some(c)) if c.is_lowercase() => true, // column name + _ => false, + } + }); if is_simple_select_statement { let mut lines = result .lines() @@ -205,7 +220,9 @@ fn process_chunk<'a>(result: &'a str) -> Cow<'a, str> { (placement, line) }); Cow::Owned(lines.join("\n")) - }else{Cow::Borrowed(result)} + } else { + Cow::Borrowed(result) + } } else { Cow::Borrowed(result) }