diff --git a/Cargo.lock b/Cargo.lock index 1bcbb611b..2a1097d19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1255,9 +1255,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.13" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ "crossbeam-utils", ] @@ -2814,7 +2814,6 @@ dependencies = [ "async-trait", "bcrypt", "chrono", - "crossbeam-channel", "deadpool 0.10.0", "diesel", "diesel-async", diff --git a/crates/db_schema/Cargo.toml b/crates/db_schema/Cargo.toml index c268c7904..072fa41ae 100644 --- a/crates/db_schema/Cargo.toml +++ b/crates/db_schema/Cargo.toml @@ -86,7 +86,6 @@ moka.workspace = true serial_test = { workspace = true } pretty_assertions = { workspace = true } diff = "0.1.13" -crossbeam-channel = "0.5.13" [package.metadata.cargo-machete] ignored = ["strum"] diff --git a/crates/db_schema/src/schema_setup.rs b/crates/db_schema/src/schema_setup.rs index 6032db3c4..573c9195b 100644 --- a/crates/db_schema/src/schema_setup.rs +++ b/crates/db_schema/src/schema_setup.rs @@ -48,8 +48,6 @@ const REPLACEABLE_SCHEMA: &[&str] = &[ struct MigrationHarnessWrapper<'a, 'b> { conn: &'a mut PgConnection, options: &'b Options, - #[cfg(test)] - diff_checker:diff_check::DiffChecker, } impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { @@ -61,12 +59,13 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { #[cfg(test)] if self.options.enable_diff_check { - let before = self.diff_checker.get_dump(); + let before = diff_check::get_dump(&mut self.conn); self.conn.run_migration(migration)?; self.conn.revert_migration(migration)?; - self.diff_checker.check_dump_diff( + diff_check::check_dump_diff( + &mut self.conn, before, - format!("migrations/{name}/down.sql"), + &format!("migrations/{name}/down.sql"), ); } @@ -76,10 +75,6 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { let duration = start_time.elapsed().as_millis(); info!("{duration}ms run {name}"); - #[cfg(test)] - if result.is_err() { - self.diff_checker.finish(); - } result } @@ -158,8 +153,6 @@ impl Options { // TODO return struct with field `ran_replaceable_schema` pub fn run(options: Options) -> LemmyResult<()> { let db_url = SETTINGS.get_database_url(); - #[cfg(test)] - let mut diff_checker = diff_check::DiffChecker::new(&db_url)?; // Migrations don't support async connection, and this function doesn't need to be async let mut conn = @@ -205,8 +198,6 @@ pub fn run(options: Options) -> LemmyResult<()> { let mut wrapper = MigrationHarnessWrapper { conn, options: &options, - #[cfg(test)] - diff_checker, }; // * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing @@ -256,7 +247,7 @@ pub fn run(options: Options) -> LemmyResult<()> { if !(options.revert && !options.redo_after_revert) { #[cfg(test)] if options.enable_diff_check { - let before = wrapper.diff_checker.get_dump(); + let before = diff_check::get_dump(&mut wrapper.conn); // todo move replaceable_schema dir path to let/const? wrapper .conn @@ -267,7 +258,7 @@ pub fn run(options: Options) -> LemmyResult<()> { .conn .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?; // todo use different first output line in this case - wrapper.diff_checker.check_dump_diff(before, "replaceable_schema".to_owned()); + diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema"); } wrapper @@ -281,8 +272,6 @@ pub fn run(options: Options) -> LemmyResult<()> { debug_assert_eq!(num_rows_updated, 1); } - #[cfg(test)] - wrapper.diff_checker.finish(); Ok(()) }; diff --git a/crates/db_schema/src/schema_setup/diff_check.rs b/crates/db_schema/src/schema_setup/diff_check.rs index 4e8c63faa..dd731c329 100644 --- a/crates/db_schema/src/schema_setup/diff_check.rs +++ b/crates/db_schema/src/schema_setup/diff_check.rs @@ -1,122 +1,46 @@ -use diesel::{PgConnection, RunQueryDsl,connection::SimpleConnection,Connection}; +use diesel::{PgConnection, RunQueryDsl}; use lemmy_utils::settings::SETTINGS; use std::{ borrow::Cow, collections::BTreeSet, fmt::Write, - process::{Command, Stdio},thread,cell::OnceCell,sync::{Arc,Mutex},collections::HashMap,any::Any + process::{Command, Stdio}, }; -use crossbeam_channel::{Sender, Receiver}; - -enum DumpAction { - Send(Sender), - Compare(Receiver, String), -} - -pub struct DiffChecker { - snapshot_conn: PgConnection, - handles: Vec>, - snapshot_sender: Option>, - error: Receiver>, - // todo rename to channels - //dump_receivers: Arc>>>, -} diesel::sql_function! { fn pg_export_snapshot() -> diesel::sql_types::Text; } -impl DiffChecker { - pub fn new(db_url: &str) -> diesel::result::QueryResult { - // todo use settings - let mut snapshot_conn = PgConnection::establish(db_url).expect("conn"); - snapshot_conn.batch_execute("BEGIN;")?; - let (tx, rx) = crossbeam_channel::unbounded(); - let (error_t, error_r) = crossbeam_channel::unbounded(); - //let dump_receivers = Arc::new(Mutex::new(HashMap::new())); +pub fn get_dump(conn: &mut PgConnection) -> String { + /*// Required for pg_dump to see uncommitted changes from a different database connection - let mut handles = Vec::new(); - let n = usize::from(thread::available_parallelism().expect("parallelism")); - // todo remove - assert_eq!(16,n); - for _ in 0..(n){ - let rx2 = rx.clone(); - let error_t = error_t.clone(); - handles.push(thread::spawn(move || if let Err(e) = std::panic::catch_unwind(move || { - while let Ok((snapshot, action)) = rx2.recv() { - let snapshot_arg = format!("--snapshot={snapshot}"); - let output = Command::new("pg_dump") - .args(["--schema-only", &snapshot_arg]) - .env("DATABASE_URL", SETTINGS.get_database_url()) - .output() - .expect("failed to start pg_dump process"); - - if !output.status.success() { - panic!("{}", String::from_utf8(output.stderr).expect("")); - } - - let output_string = String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text"); - match action { - DumpAction::Send(x) => {x.send(output_string).ok();}, - DumpAction::Compare(x, name) => { - if let Ok(before) = x.recv() { - if let Some(e) = check_dump_diff(before, output_string, &name) { - panic!("{e}"); - } - } - } - } - } - }){ - error_t.send(e).ok(); - })); - } + // The pg_dump command runs independently from `conn`, which means it can't see changes from + // an uncommitted transaction. NASA made each migration run in a separate transaction. When + // it was discovered that + let snapshot = diesel::select(pg_export_snapshot()) + .get_result::(conn) + .expect("pg_export_snapshot failed"); + let snapshot_arg = format!("--snapshot={snapshot}");*/ + let output = Command::new("pg_dump") + .args(["--schema-only"]) + .env("DATABASE_URL", SETTINGS.get_database_url()) + .stderr(Stdio::inherit()) + .output() + .expect("failed to start pg_dump process"); - Ok(DiffChecker {snapshot_conn,handles,snapshot_sender:Some(tx),error:error_r}) - } + // TODO: use exit_ok method when it's stable + assert!(output.status.success()); - fn check_err(&mut self) { - if let Ok(e) = self.error.try_recv() { - std::panic::resume_unwind(e); - } - } - - pub fn finish(&mut self) { - self.snapshot_sender.take(); // stop threads from waiting - for handle in self.handles.drain(..) { - handle.join().expect(""); - } - self.check_err(); - } - - fn get_snapshot(&mut self) -> String { - diesel::select(pg_export_snapshot()) - .get_result::(&mut self.snapshot_conn) - .expect("pg_export_snapshot failed") - } - - pub fn get_dump(&mut self) -> Receiver { - self.check_err(); - let snapshot = self.get_snapshot(); - let (tx, rx) = crossbeam_channel::unbounded(); // ::bounded(1); - self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Send(tx))).expect("send msg"); - rx - } - - pub fn check_dump_diff(&mut self, before: Receiver, name: String) { - self.check_err(); - let snapshot = self.get_snapshot(); - self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Compare(before, name))).expect("compare msg"); - } + String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text") } - const PATTERN_LEN: usize = 19; // TODO add unit test for output -pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Option { +pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) { + let mut after = get_dump(conn); if after == before { - return None; + return; } // Ignore timestamp differences by removing timestamps for dump in [&mut before, &mut after] { @@ -173,7 +97,7 @@ pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Opt .collect::>() }); if only_in_before.is_empty() && only_in_after.is_empty() { - return None; + return; } let after_has_more = only_in_before.len() < only_in_after.len(); @@ -218,13 +142,13 @@ pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Opt } .expect("failed to build string"); } - write!(&mut output, "\n{most_similar_chunk_filtered}").expect(""); + write!(&mut output, "\n{most_similar_chunk_filtered}"); if !chunks_gt.is_empty() { chunks_gt.swap_remove(most_similar_chunk_index);} } // should have all been removed assert_eq!(chunks_gt.len(), 0); - Some(output) + panic!("{output}"); } // todo inline?