diff --git a/Cargo.lock b/Cargo.lock index 2a1097d19..1bcbb611b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1255,9 +1255,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" dependencies = [ "crossbeam-utils", ] @@ -2814,6 +2814,7 @@ dependencies = [ "async-trait", "bcrypt", "chrono", + "crossbeam-channel", "deadpool 0.10.0", "diesel", "diesel-async", diff --git a/crates/db_schema/Cargo.toml b/crates/db_schema/Cargo.toml index 072fa41ae..c268c7904 100644 --- a/crates/db_schema/Cargo.toml +++ b/crates/db_schema/Cargo.toml @@ -86,6 +86,7 @@ moka.workspace = true serial_test = { workspace = true } pretty_assertions = { workspace = true } diff = "0.1.13" +crossbeam-channel = "0.5.13" [package.metadata.cargo-machete] ignored = ["strum"] diff --git a/crates/db_schema/src/schema_setup.rs b/crates/db_schema/src/schema_setup.rs index 573c9195b..6032db3c4 100644 --- a/crates/db_schema/src/schema_setup.rs +++ b/crates/db_schema/src/schema_setup.rs @@ -48,6 +48,8 @@ const REPLACEABLE_SCHEMA: &[&str] = &[ struct MigrationHarnessWrapper<'a, 'b> { conn: &'a mut PgConnection, options: &'b Options, + #[cfg(test)] + diff_checker:diff_check::DiffChecker, } impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { @@ -59,13 +61,12 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { #[cfg(test)] if self.options.enable_diff_check { - let before = diff_check::get_dump(&mut self.conn); + let before = self.diff_checker.get_dump(); self.conn.run_migration(migration)?; self.conn.revert_migration(migration)?; - diff_check::check_dump_diff( - &mut self.conn, + self.diff_checker.check_dump_diff( before, - &format!("migrations/{name}/down.sql"), + format!("migrations/{name}/down.sql"), ); } @@ -75,6 +76,10 @@ impl<'a, 'b> MigrationHarness for MigrationHarnessWrapper<'a, 'b> { let duration = start_time.elapsed().as_millis(); info!("{duration}ms run {name}"); + #[cfg(test)] + if result.is_err() { + self.diff_checker.finish(); + } result } @@ -153,6 +158,8 @@ impl Options { // TODO return struct with field `ran_replaceable_schema` pub fn run(options: Options) -> LemmyResult<()> { let db_url = SETTINGS.get_database_url(); + #[cfg(test)] + let mut diff_checker = diff_check::DiffChecker::new(&db_url)?; // Migrations don't support async connection, and this function doesn't need to be async let mut conn = @@ -198,6 +205,8 @@ pub fn run(options: Options) -> LemmyResult<()> { let mut wrapper = MigrationHarnessWrapper { conn, options: &options, + #[cfg(test)] + diff_checker, }; // * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing @@ -247,7 +256,7 @@ pub fn run(options: Options) -> LemmyResult<()> { if !(options.revert && !options.redo_after_revert) { #[cfg(test)] if options.enable_diff_check { - let before = diff_check::get_dump(&mut wrapper.conn); + let before = wrapper.diff_checker.get_dump(); // todo move replaceable_schema dir path to let/const? wrapper .conn @@ -258,7 +267,7 @@ pub fn run(options: Options) -> LemmyResult<()> { .conn .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?; // todo use different first output line in this case - diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema"); + wrapper.diff_checker.check_dump_diff(before, "replaceable_schema".to_owned()); } wrapper @@ -272,6 +281,8 @@ pub fn run(options: Options) -> LemmyResult<()> { debug_assert_eq!(num_rows_updated, 1); } + #[cfg(test)] + wrapper.diff_checker.finish(); Ok(()) }; diff --git a/crates/db_schema/src/schema_setup/diff_check.rs b/crates/db_schema/src/schema_setup/diff_check.rs index dd731c329..4e8c63faa 100644 --- a/crates/db_schema/src/schema_setup/diff_check.rs +++ b/crates/db_schema/src/schema_setup/diff_check.rs @@ -1,46 +1,122 @@ -use diesel::{PgConnection, RunQueryDsl}; +use diesel::{PgConnection, RunQueryDsl,connection::SimpleConnection,Connection}; use lemmy_utils::settings::SETTINGS; use std::{ borrow::Cow, collections::BTreeSet, fmt::Write, - process::{Command, Stdio}, + process::{Command, Stdio},thread,cell::OnceCell,sync::{Arc,Mutex},collections::HashMap,any::Any }; +use crossbeam_channel::{Sender, Receiver}; + +enum DumpAction { + Send(Sender), + Compare(Receiver, String), +} + +pub struct DiffChecker { + snapshot_conn: PgConnection, + handles: Vec>, + snapshot_sender: Option>, + error: Receiver>, + // todo rename to channels + //dump_receivers: Arc>>>, +} diesel::sql_function! { fn pg_export_snapshot() -> diesel::sql_types::Text; } -pub fn get_dump(conn: &mut PgConnection) -> String { - /*// Required for pg_dump to see uncommitted changes from a different database connection +impl DiffChecker { + pub fn new(db_url: &str) -> diesel::result::QueryResult { + // todo use settings + let mut snapshot_conn = PgConnection::establish(db_url).expect("conn"); + snapshot_conn.batch_execute("BEGIN;")?; + let (tx, rx) = crossbeam_channel::unbounded(); + let (error_t, error_r) = crossbeam_channel::unbounded(); + //let dump_receivers = Arc::new(Mutex::new(HashMap::new())); - // The pg_dump command runs independently from `conn`, which means it can't see changes from - // an uncommitted transaction. NASA made each migration run in a separate transaction. When - // it was discovered that - let snapshot = diesel::select(pg_export_snapshot()) - .get_result::(conn) - .expect("pg_export_snapshot failed"); - let snapshot_arg = format!("--snapshot={snapshot}");*/ - let output = Command::new("pg_dump") - .args(["--schema-only"]) - .env("DATABASE_URL", SETTINGS.get_database_url()) - .stderr(Stdio::inherit()) - .output() - .expect("failed to start pg_dump process"); + let mut handles = Vec::new(); + let n = usize::from(thread::available_parallelism().expect("parallelism")); + // todo remove + assert_eq!(16,n); + for _ in 0..(n){ + let rx2 = rx.clone(); + let error_t = error_t.clone(); + handles.push(thread::spawn(move || if let Err(e) = std::panic::catch_unwind(move || { + while let Ok((snapshot, action)) = rx2.recv() { + let snapshot_arg = format!("--snapshot={snapshot}"); + let output = Command::new("pg_dump") + .args(["--schema-only", &snapshot_arg]) + .env("DATABASE_URL", SETTINGS.get_database_url()) + .output() + .expect("failed to start pg_dump process"); + + if !output.status.success() { + panic!("{}", String::from_utf8(output.stderr).expect("")); + } + + let output_string = String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text"); + match action { + DumpAction::Send(x) => {x.send(output_string).ok();}, + DumpAction::Compare(x, name) => { + if let Ok(before) = x.recv() { + if let Some(e) = check_dump_diff(before, output_string, &name) { + panic!("{e}"); + } + } + } + } + } + }){ + error_t.send(e).ok(); + })); + } - // TODO: use exit_ok method when it's stable - assert!(output.status.success()); + Ok(DiffChecker {snapshot_conn,handles,snapshot_sender:Some(tx),error:error_r}) + } - String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text") + fn check_err(&mut self) { + if let Ok(e) = self.error.try_recv() { + std::panic::resume_unwind(e); + } + } + + pub fn finish(&mut self) { + self.snapshot_sender.take(); // stop threads from waiting + for handle in self.handles.drain(..) { + handle.join().expect(""); + } + self.check_err(); + } + + fn get_snapshot(&mut self) -> String { + diesel::select(pg_export_snapshot()) + .get_result::(&mut self.snapshot_conn) + .expect("pg_export_snapshot failed") + } + + pub fn get_dump(&mut self) -> Receiver { + self.check_err(); + let snapshot = self.get_snapshot(); + let (tx, rx) = crossbeam_channel::unbounded(); // ::bounded(1); + self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Send(tx))).expect("send msg"); + rx + } + + pub fn check_dump_diff(&mut self, before: Receiver, name: String) { + self.check_err(); + let snapshot = self.get_snapshot(); + self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Compare(before, name))).expect("compare msg"); + } } + const PATTERN_LEN: usize = 19; // TODO add unit test for output -pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) { - let mut after = get_dump(conn); +pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Option { if after == before { - return; + return None; } // Ignore timestamp differences by removing timestamps for dump in [&mut before, &mut after] { @@ -97,7 +173,7 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) .collect::>() }); if only_in_before.is_empty() && only_in_after.is_empty() { - return; + return None; } let after_has_more = only_in_before.len() < only_in_after.len(); @@ -142,13 +218,13 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) } .expect("failed to build string"); } - write!(&mut output, "\n{most_similar_chunk_filtered}"); + write!(&mut output, "\n{most_similar_chunk_filtered}").expect(""); if !chunks_gt.is_empty() { chunks_gt.swap_remove(most_similar_chunk_index);} } // should have all been removed assert_eq!(chunks_gt.len(), 0); - panic!("{output}"); + Some(output) } // todo inline?