From 8c532c97e691215b87a654e8245a1e30208ec568 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 2 Sep 2023 11:52:55 -0500 Subject: [PATCH] Initial postgres work --- Cargo.lock | 7 ++ Cargo.toml | 1 + flake.nix | 1 + src/config.rs | 4 +- src/config/commandline.rs | 91 ++++++++++++++++++- src/config/defaults.rs | 12 +++ src/config/file.rs | 7 ++ src/lib.rs | 34 +++---- src/repo.rs | 10 +- src/repo/postgres.rs | 81 +++++++++++++++++ .../migrations/V0001__create_hashes.rs | 31 +++++++ src/store/file_store.rs | 40 +++----- src/store/object_store.rs | 38 +++----- 13 files changed, 282 insertions(+), 75 deletions(-) create mode 100644 src/repo/postgres.rs create mode 100644 src/repo/postgres/migrations/V0001__create_hashes.rs diff --git a/Cargo.lock b/Cargo.lock index f856629..c7e9463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,6 +388,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "barrel" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad9e605929a6964efbec5ac0884bd0fe93f12a3b1eb271f52c251316640c68d9" + [[package]] name = "base64" version = "0.13.1" @@ -1788,6 +1794,7 @@ dependencies = [ "actix-web", "anyhow", "async-trait", + "barrel", "base64 0.21.3", "clap", "color-eyre", diff --git a/Cargo.toml b/Cargo.toml index 9146a07..7f6e6db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ actix-server = "2.0.0" actix-web = { version = "4.0.0", default-features = false } anyhow = "1.0" async-trait = "0.1.51" +barrel = { version = "0.7.0", features = ["pg"] } base64 = "0.21.0" clap = { version = "4.0.2", features = ["derive"] } color-eyre = "0.6" diff --git a/flake.nix b/flake.nix index 8538a2d..c0c7e2a 100644 --- a/flake.nix +++ b/flake.nix @@ -32,6 +32,7 @@ cargo cargo-outdated clippy + diesel-cli exiftool ffmpeg_6-full garage diff --git a/src/config.rs b/src/config.rs index 23ed003..25cf0e7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,8 +12,8 @@ use defaults::Defaults; pub(crate) use commandline::Operation; pub(crate) use file::{ - Animation, ConfigFile as Configuration, Image, Media, ObjectStorage, OpenTelemetry, Repo, Sled, - Store, Tracing, Video, + Animation, ConfigFile as Configuration, Image, Media, ObjectStorage, OpenTelemetry, Postgres, + Repo, Sled, Store, Tracing, Video, }; pub(crate) use primitives::{Filesystem, LogFormat}; diff --git a/src/config/commandline.rs b/src/config/commandline.rs index d08a52c..c7ffad5 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -369,8 +369,64 @@ impl Args { from: from.into(), to: to.into(), }, - config_file, save_to, + config_file, + }, + MigrateRepoTo::Postgres(MigratePostgresInner { to }) => Output { + config_format: ConfigFormat { + server, + client, + old_repo, + tracing, + metrics, + media, + repo: None, + store: None, + }, + operation: Operation::MigrateRepo { + from: from.into(), + to: to.into(), + }, + save_to, + config_file, + }, + }, + MigrateRepoFrom::Postgres(MigratePostgresRepo { from, to }) => match to { + MigrateRepoTo::Sled(MigrateSledInner { to }) => Output { + config_format: ConfigFormat { + server, + client, + old_repo, + tracing, + metrics, + media, + repo: None, + store: None, + }, + operation: Operation::MigrateRepo { + from: from.into(), + to: to.into(), + }, + save_to, + config_file, + }, + MigrateRepoTo::Postgres(MigratePostgresInner { to }) => Output { + config_format: ConfigFormat { + server, + client, + old_repo, + tracing, + metrics, + media, + repo: None, + store: None, + }, + operation: Operation::MigrateRepo { + from: from.into(), + to: to.into(), + }, + save_to, + config_file, }, }, } @@ -1058,6 +1114,7 @@ enum MigrateStoreFrom { #[derive(Debug, Subcommand)] enum MigrateRepoFrom { Sled(MigrateSledRepo), + Postgres(MigratePostgresRepo), } /// Configure the destination storage for pict-rs storage migration @@ -1075,8 +1132,10 @@ enum MigrateStoreTo { /// Configure the destination repo for pict-rs repo migration #[derive(Debug, Subcommand)] enum MigrateRepoTo { - /// Migrate to the provided sled storage + /// Migrate to the provided sled repo Sled(MigrateSledInner), + /// Migrate to the provided postgres repo + Postgres(MigratePostgresInner), } /// Migrate pict-rs' storage from the provided filesystem storage @@ -1099,6 +1158,16 @@ struct MigrateSledRepo { to: MigrateRepoTo, } +/// Migrate pict-rs' repo from the provided postgres repo +#[derive(Debug, Parser)] +struct MigratePostgresRepo { + #[command(flatten)] + from: Postgres, + + #[command(subcommand)] + to: MigrateRepoTo, +} + /// Migrate pict-rs' storage to the provided filesystem storage #[derive(Debug, Parser)] struct MigrateFilesystemInner { @@ -1116,6 +1185,13 @@ struct MigrateSledInner { to: Sled, } +/// Migrate pict-rs' repo to the provided postgres repo +#[derive(Debug, Parser)] +struct MigratePostgresInner { + #[command(flatten)] + to: Postgres, +} + /// Migrate pict-rs' storage from the provided object storage #[derive(Debug, Parser)] struct MigrateObjectStorage { @@ -1163,6 +1239,8 @@ struct RunObjectStorage { enum Repo { /// Run pict-rs with the provided sled-backed data repository Sled(Sled), + /// Run pict-rs with the provided postgres-backed data repository + Postgres(Postgres), } /// Configuration for filesystem media storage @@ -1254,6 +1332,15 @@ pub(super) struct Sled { pub(super) export_path: Option, } +/// Configuration for the postgres-backed data repository +#[derive(Debug, Parser, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) struct Postgres { + /// The URL of the postgres database + #[arg(short, long)] + pub(super) url: Url, +} + #[derive(Debug, Parser, serde::Serialize)] #[serde(rename_all = "snake_case")] struct OldSled { diff --git a/src/config/defaults.rs b/src/config/defaults.rs index fc4a0d2..935d5c9 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -363,8 +363,20 @@ impl From for crate::config::file::Sled { } } +impl From for crate::config::file::Postgres { + fn from(value: crate::config::commandline::Postgres) -> Self { + crate::config::file::Postgres { url: value.url } + } +} + impl From for crate::config::file::Repo { fn from(value: crate::config::commandline::Sled) -> Self { crate::config::file::Repo::Sled(value.into()) } } + +impl From for crate::config::file::Repo { + fn from(value: crate::config::commandline::Postgres) -> Self { + crate::config::file::Repo::Postgres(value.into()) + } +} diff --git a/src/config/file.rs b/src/config/file.rs index 9e78d19..213c715 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -88,6 +88,7 @@ pub(crate) struct ObjectStorage { #[serde(tag = "type")] pub(crate) enum Repo { Sled(Sled), + Postgres(Postgres), } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -421,3 +422,9 @@ pub(crate) struct Sled { pub(crate) export_path: PathBuf, } + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct Postgres { + pub(crate) url: Url, +} diff --git a/src/lib.rs b/src/lib.rs index b7faff8..21952fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1810,7 +1810,7 @@ async fn launch_object_store( - repo: Repo, + repo: ArcRepo, client: ClientWithMiddleware, from: S1, to: config::primitives::Store, @@ -1824,11 +1824,7 @@ where config::primitives::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; - match repo { - Repo::Sled(repo) => { - migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await? - } - } + migrate_store(repo, from, to, skip_missing_files, timeout).await? } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1862,11 +1858,7 @@ where .await? .build(client); - match repo { - Repo::Sled(repo) => { - migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await? - } - } + migrate_store(repo, from, to, skip_missing_files, timeout).await? } } @@ -1970,7 +1962,7 @@ impl PictRsConfiguration { from, to, } => { - let repo = Repo::open(config.repo.clone())?; + let repo = Repo::open(config.repo.clone()).await?.to_arc(); match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { @@ -2034,15 +2026,15 @@ impl PictRsConfiguration { return Ok(()); } Operation::MigrateRepo { from, to } => { - let from = Repo::open(from)?.to_arc(); - let to = Repo::open(to)?.to_arc(); + let from = Repo::open(from).await?.to_arc(); + let to = Repo::open(to).await?.to_arc(); repo::migrate_repo(from, to).await?; return Ok(()); } } - let repo = Repo::open(config.repo.clone())?; + let repo = Repo::open(config.repo.clone()).await?; if config.server.read_only { tracing::warn!("Launching in READ ONLY mode"); @@ -2050,10 +2042,10 @@ impl PictRsConfiguration { match config.store.clone() { config::Store::Filesystem(config::Filesystem { path }) => { - let store = FileStore::build(path, repo.clone()).await?; - let arc_repo = repo.to_arc(); + let store = FileStore::build(path, arc_repo.clone()).await?; + if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(old_repo) = repo_04::open(&config.old_repo)? { repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) @@ -2075,6 +2067,7 @@ impl PictRsConfiguration { ) .await?; } + Repo::Postgres(_) => todo!(), } } config::Store::ObjectStorage(config::ObjectStorage { @@ -2089,6 +2082,8 @@ impl PictRsConfiguration { client_timeout, public_endpoint, }) => { + let arc_repo = repo.to_arc(); + let store = ObjectStore::build( endpoint, bucket_name, @@ -2104,13 +2099,11 @@ impl PictRsConfiguration { signature_duration, client_timeout, public_endpoint, - repo.clone(), + arc_repo.clone(), ) .await? .build(client.clone()); - let arc_repo = repo.to_arc(); - if arc_repo.get("migrate-0.4").await?.is_none() { if let Some(old_repo) = repo_04::open(&config.old_repo)? { repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) @@ -2128,6 +2121,7 @@ impl PictRsConfiguration { }) .await?; } + Repo::Postgres(_) => todo!(), } } } diff --git a/src/repo.rs b/src/repo.rs index 2518cb3..51553f4 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -12,6 +12,7 @@ use uuid::Uuid; mod hash; mod migrate; +pub(crate) mod postgres; pub(crate) mod sled; pub(crate) use hash::Hash; @@ -22,6 +23,7 @@ pub(crate) type ArcRepo = Arc; #[derive(Clone, Debug)] pub(crate) enum Repo { Sled(self::sled::SledRepo), + Postgres(self::postgres::PostgresRepo), } #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -791,7 +793,7 @@ where impl Repo { #[tracing::instrument] - pub(crate) fn open(config: config::Repo) -> color_eyre::Result { + pub(crate) async fn open(config: config::Repo) -> color_eyre::Result { match config { config::Repo::Sled(config::Sled { path, @@ -802,12 +804,18 @@ impl Repo { Ok(Self::Sled(repo)) } + config::Repo::Postgres(config::Postgres { url }) => { + let repo = self::postgres::PostgresRepo::connect(url).await?; + + Ok(Self::Postgres(repo)) + } } } pub(crate) fn to_arc(&self) -> ArcRepo { match self { Self::Sled(sled_repo) => Arc::new(sled_repo.clone()), + Self::Postgres(_) => todo!(), } } } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs new file mode 100644 index 0000000..7c3b031 --- /dev/null +++ b/src/repo/postgres.rs @@ -0,0 +1,81 @@ +mod embedded { + use refinery::embed_migrations; + + embed_migrations!("./src/repo/postgres/migrations"); +} + +use diesel_async::{ + pooled_connection::{ + deadpool::{BuildError, Pool}, + AsyncDieselConnectionManager, + }, + AsyncPgConnection, +}; +use url::Url; + +use super::{BaseRepo, HashRepo}; + +#[derive(Clone)] +pub(crate) struct PostgresRepo { + pool: Pool, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum ConnectPostgresError { + #[error("Failed to connect to postgres for migrations")] + ConnectForMigration(#[source] tokio_postgres::Error), + + #[error("Failed to run migrations")] + Migration(#[source] refinery::Error), + + #[error("Failed to build postgres connection pool")] + BuildPool(#[source] BuildError), +} + +#[derive(Debug, thiserror::Error)] +enum PostgresError {} + +impl PostgresRepo { + pub(crate) async fn connect(postgres_url: Url) -> Result { + let (mut client, conn) = + tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls) + .await + .map_err(ConnectPostgresError::ConnectForMigration)?; + + let handle = actix_rt::spawn(conn); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .map_err(ConnectPostgresError::Migration)?; + + handle.abort(); + let _ = handle.await; + + let config = AsyncDieselConnectionManager::::new(postgres_url); + let pool = Pool::builder(config) + .build() + .map_err(ConnectPostgresError::BuildPool)?; + + Ok(PostgresRepo { pool }) + } +} + +impl BaseRepo for PostgresRepo {} + +/* +#[async_trait::async_trait] +impl HashRepo for PostgresRepo { + async fn size(&self) -> Result { + let conn = self.pool.get().await.map_err(PostgresError::from)?; + } +} +*/ + +impl std::fmt::Debug for PostgresRepo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PostgresRepo") + .field("pool", &"pool") + .finish() + } +} diff --git a/src/repo/postgres/migrations/V0001__create_hashes.rs b/src/repo/postgres/migrations/V0001__create_hashes.rs new file mode 100644 index 0000000..30455c3 --- /dev/null +++ b/src/repo/postgres/migrations/V0001__create_hashes.rs @@ -0,0 +1,31 @@ +use barrel::backend::Pg; +use barrel::functions::AutogenFunction; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.create_table("hashes", |t| { + t.add_column( + "hash", + types::binary() + .primary(true) + .unique(true) + .nullable(false) + .size(128), + ); + t.add_column("identifier", types::text().unique(true).nullable(false)); + t.add_column( + "motion_identifier", + types::text().unique(true).nullable(true), + ); + t.add_column( + "created_at", + types::datetime() + .nullable(false) + .default(AutogenFunction::CurrentTimestamp), + ); + }); + + m.make::().to_string() +} diff --git a/src/store/file_store.rs b/src/store/file_store.rs index b2a1cb7..b1738cb 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,9 +1,4 @@ -use crate::{ - error_code::ErrorCode, - file::File, - repo::{Repo, SettingsRepo}, - store::Store, -}; +use crate::{error_code::ErrorCode, file::File, repo::ArcRepo, store::Store}; use actix_web::web::Bytes; use futures_core::Stream; use std::{ @@ -58,7 +53,7 @@ impl FileError { pub(crate) struct FileStore { path_gen: Generator, root_dir: PathBuf, - repo: Repo, + repo: ArcRepo, } #[async_trait::async_trait(?Send)] @@ -189,7 +184,7 @@ impl Store for FileStore { impl FileStore { #[tracing::instrument(skip(repo))] - pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> color_eyre::Result { + pub(crate) async fn build(root_dir: PathBuf, repo: ArcRepo) -> color_eyre::Result { let path_gen = init_generator(&repo).await?; tokio::fs::create_dir_all(&root_dir).await?; @@ -204,13 +199,9 @@ impl FileStore { async fn next_directory(&self) -> Result { let path = self.path_gen.next(); - match self.repo { - Repo::Sled(ref sled_repo) => { - sled_repo - .set(GENERATOR_KEY, path.to_be_bytes().into()) - .await?; - } - } + self.repo + .set(GENERATOR_KEY, path.to_be_bytes().into()) + .await?; let mut target_path = self.root_dir.clone(); for dir in path.to_strings() { @@ -308,18 +299,13 @@ pub(crate) async fn safe_create_parent>(path: P) -> Result<(), Fi Ok(()) } -async fn init_generator(repo: &Repo) -> Result { - match repo { - Repo::Sled(sled_repo) => { - if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { - Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec()) - .map_err(FileError::from)?, - )) - } else { - Ok(Generator::new()) - } - } +async fn init_generator(repo: &ArcRepo) -> Result { + if let Some(ivec) = repo.get(GENERATOR_KEY).await? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec()).map_err(FileError::from)?, + )) + } else { + Ok(Generator::new()) } } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a37f465..301c181 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,7 +1,7 @@ use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, - repo::{Repo, SettingsRepo}, + repo::ArcRepo, store::Store, stream::{IntoStreamer, StreamMap}, }; @@ -107,7 +107,7 @@ impl From for ObjectError { #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, - repo: Repo, + repo: ArcRepo, bucket: Bucket, credentials: Credentials, client: ClientWithMiddleware, @@ -119,7 +119,7 @@ pub(crate) struct ObjectStore { #[derive(Clone)] pub(crate) struct ObjectStoreConfig { path_gen: Generator, - repo: Repo, + repo: ArcRepo, bucket: Bucket, credentials: Credentials, signature_expiration: u64, @@ -493,7 +493,7 @@ impl ObjectStore { signature_expiration: u64, client_timeout: u64, public_endpoint: Option, - repo: Repo, + repo: ArcRepo, ) -> Result { let path_gen = init_generator(&repo).await?; @@ -714,13 +714,9 @@ impl ObjectStore { async fn next_directory(&self) -> Result { let path = self.path_gen.next(); - match self.repo { - Repo::Sled(ref sled_repo) => { - sled_repo - .set(GENERATOR_KEY, path.to_be_bytes().into()) - .await?; - } - } + self.repo + .set(GENERATOR_KEY, path.to_be_bytes().into()) + .await?; Ok(path) } @@ -733,18 +729,14 @@ impl ObjectStore { } } -async fn init_generator(repo: &Repo) -> Result { - match repo { - Repo::Sled(sled_repo) => { - if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { - Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec()) - .map_err(ObjectError::from)?, - )) - } else { - Ok(Generator::new()) - } - } +async fn init_generator(repo: &ArcRepo) -> Result { + if let Some(ivec) = repo.get(GENERATOR_KEY).await? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec()) + .map_err(ObjectError::from)?, + )) + } else { + Ok(Generator::new()) } }