2023-09-02 18:52:15 +00:00
|
|
|
mod embedded;
|
2023-09-02 18:35:30 +00:00
|
|
|
mod schema;
|
2023-09-02 16:52:55 +00:00
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2023-09-02 18:52:15 +00:00
|
|
|
use diesel::prelude::*;
|
2023-09-02 16:52:55 +00:00
|
|
|
use diesel_async::{
|
|
|
|
pooled_connection::{
|
2023-09-02 18:52:15 +00:00
|
|
|
deadpool::{BuildError, Pool, PoolError},
|
2023-09-02 16:52:55 +00:00
|
|
|
AsyncDieselConnectionManager,
|
|
|
|
},
|
2023-09-03 00:13:00 +00:00
|
|
|
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
2023-09-02 16:52:55 +00:00
|
|
|
};
|
|
|
|
use url::Url;
|
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
use crate::error_code::ErrorCode;
|
|
|
|
|
|
|
|
use super::{
|
|
|
|
BaseRepo, Hash, HashAlreadyExists, HashPage, HashRepo, OrderedHash, RepoError,
|
|
|
|
VariantAlreadyExists,
|
|
|
|
};
|
2023-09-02 16:52:55 +00:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub(crate) struct PostgresRepo {
|
|
|
|
pool: Pool<AsyncPgConnection>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
|
|
pub(crate) enum ConnectPostgresError {
|
|
|
|
#[error("Failed to connect to postgres for migrations")]
|
|
|
|
ConnectForMigration(#[source] tokio_postgres::Error),
|
|
|
|
|
|
|
|
#[error("Failed to run migrations")]
|
|
|
|
Migration(#[source] refinery::Error),
|
|
|
|
|
|
|
|
#[error("Failed to build postgres connection pool")]
|
|
|
|
BuildPool(#[source] BuildError),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
2023-09-02 18:52:15 +00:00
|
|
|
pub(crate) enum PostgresError {
|
|
|
|
#[error("Error in db pool")]
|
|
|
|
Pool(#[source] PoolError),
|
|
|
|
|
|
|
|
#[error("Error in database")]
|
|
|
|
Diesel(#[source] diesel::result::Error),
|
|
|
|
}
|
2023-09-02 16:52:55 +00:00
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
impl PostgresError {
|
|
|
|
pub(super) const fn error_code(&self) -> ErrorCode {
|
|
|
|
todo!()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-02 16:52:55 +00:00
|
|
|
impl PostgresRepo {
|
|
|
|
pub(crate) async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> {
|
|
|
|
let (mut client, conn) =
|
|
|
|
tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls)
|
|
|
|
.await
|
|
|
|
.map_err(ConnectPostgresError::ConnectForMigration)?;
|
|
|
|
|
|
|
|
let handle = actix_rt::spawn(conn);
|
|
|
|
|
|
|
|
embedded::migrations::runner()
|
|
|
|
.run_async(&mut client)
|
|
|
|
.await
|
|
|
|
.map_err(ConnectPostgresError::Migration)?;
|
|
|
|
|
|
|
|
handle.abort();
|
|
|
|
let _ = handle.await;
|
|
|
|
|
|
|
|
let config = AsyncDieselConnectionManager::<AsyncPgConnection>::new(postgres_url);
|
|
|
|
let pool = Pool::builder(config)
|
|
|
|
.build()
|
|
|
|
.map_err(ConnectPostgresError::BuildPool)?;
|
|
|
|
|
|
|
|
Ok(PostgresRepo { pool })
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime {
|
|
|
|
let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
|
|
|
|
time::PrimitiveDateTime::new(timestamp.date(), timestamp.time())
|
|
|
|
}
|
|
|
|
|
2023-09-02 16:52:55 +00:00
|
|
|
impl BaseRepo for PostgresRepo {}
|
|
|
|
|
2023-09-02 18:52:15 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
2023-09-02 16:52:55 +00:00
|
|
|
impl HashRepo for PostgresRepo {
|
|
|
|
async fn size(&self) -> Result<u64, RepoError> {
|
2023-09-02 18:52:15 +00:00
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let count = hashes
|
|
|
|
.count()
|
|
|
|
.get_result::<i64>(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(count.try_into().expect("non-negative count"))
|
2023-09-02 16:52:55 +00:00
|
|
|
}
|
2023-09-02 23:30:45 +00:00
|
|
|
|
|
|
|
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let timestamp = hashes
|
|
|
|
.select(created_at)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.first(&mut conn)
|
|
|
|
.await
|
|
|
|
.map(time::PrimitiveDateTime::assume_utc)
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(timestamp.map(|timestamp| OrderedHash {
|
|
|
|
timestamp,
|
|
|
|
hash: input_hash,
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn hash_page_by_date(
|
|
|
|
&self,
|
|
|
|
date: time::OffsetDateTime,
|
|
|
|
limit: usize,
|
|
|
|
) -> Result<HashPage, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let timestamp = to_primitive(date);
|
|
|
|
|
|
|
|
let ordered_hash = hashes
|
|
|
|
.select((created_at, hash))
|
|
|
|
.filter(created_at.lt(timestamp))
|
|
|
|
.order(created_at.desc())
|
|
|
|
.first::<(time::PrimitiveDateTime, Hash)>(&mut conn)
|
|
|
|
.await
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?
|
|
|
|
.map(|tup| OrderedHash {
|
|
|
|
timestamp: tup.0.assume_utc(),
|
|
|
|
hash: tup.1,
|
|
|
|
});
|
|
|
|
|
|
|
|
self.hashes_ordered(ordered_hash, limit).await
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn hashes_ordered(
|
|
|
|
&self,
|
|
|
|
bound: Option<OrderedHash>,
|
|
|
|
limit: usize,
|
|
|
|
) -> Result<HashPage, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let (mut page, prev) = if let Some(OrderedHash {
|
|
|
|
timestamp,
|
|
|
|
hash: bound_hash,
|
|
|
|
}) = bound
|
|
|
|
{
|
|
|
|
let timestamp = to_primitive(timestamp);
|
|
|
|
|
|
|
|
let page = hashes
|
|
|
|
.select(hash)
|
|
|
|
.filter(created_at.lt(timestamp))
|
|
|
|
.or_filter(created_at.eq(timestamp).and(hash.le(&bound_hash)))
|
|
|
|
.order(created_at.desc())
|
|
|
|
.then_order_by(hash.desc())
|
|
|
|
.limit(limit as i64 + 1)
|
|
|
|
.load::<Hash>(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
let prev = hashes
|
|
|
|
.select(hash)
|
|
|
|
.filter(created_at.gt(timestamp))
|
|
|
|
.or_filter(created_at.eq(timestamp).and(hash.gt(&bound_hash)))
|
|
|
|
.order(created_at)
|
|
|
|
.then_order_by(hash)
|
|
|
|
.offset(limit.saturating_sub(1) as i64)
|
|
|
|
.first::<Hash>(&mut conn)
|
|
|
|
.await
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
(page, prev)
|
|
|
|
} else {
|
|
|
|
let page = hashes
|
|
|
|
.select(hash)
|
|
|
|
.order(created_at.desc())
|
|
|
|
.then_order_by(hash.desc())
|
|
|
|
.limit(limit as i64 + 1)
|
|
|
|
.load::<Hash>(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
(page, None)
|
|
|
|
};
|
|
|
|
|
|
|
|
let next = if page.len() > limit { page.pop() } else { None };
|
|
|
|
|
|
|
|
Ok(HashPage {
|
|
|
|
limit,
|
|
|
|
prev,
|
|
|
|
next,
|
|
|
|
hashes: page,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn create_hash_with_timestamp(
|
|
|
|
&self,
|
|
|
|
input_hash: Hash,
|
|
|
|
input_identifier: &Arc<str>,
|
|
|
|
timestamp: time::OffsetDateTime,
|
|
|
|
) -> Result<Result<(), HashAlreadyExists>, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let timestamp = to_primitive(timestamp);
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
let res = diesel::insert_into(hashes)
|
|
|
|
.values((
|
|
|
|
hash.eq(&input_hash),
|
|
|
|
identifier.eq(input_identifier.as_ref()),
|
|
|
|
created_at.eq(×tamp),
|
|
|
|
))
|
|
|
|
.execute(&mut conn)
|
|
|
|
.await;
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(_) => Ok(Ok(())),
|
|
|
|
Err(diesel::result::Error::DatabaseError(
|
|
|
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
|
|
|
_,
|
|
|
|
)) => Ok(Err(HashAlreadyExists)),
|
|
|
|
Err(e) => Err(PostgresError::Diesel(e).into()),
|
|
|
|
}
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn update_identifier(
|
|
|
|
&self,
|
|
|
|
input_hash: Hash,
|
|
|
|
input_identifier: &Arc<str>,
|
|
|
|
) -> Result<(), RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
diesel::update(hashes)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.set(identifier.eq(input_identifier.as_ref()))
|
|
|
|
.execute(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(())
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let opt = hashes
|
|
|
|
.select(identifier)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.get_result::<String>(&mut conn)
|
|
|
|
.await
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(opt.map(Arc::from))
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn relate_variant_identifier(
|
|
|
|
&self,
|
2023-09-03 00:13:00 +00:00
|
|
|
input_hash: Hash,
|
|
|
|
input_variant: String,
|
|
|
|
input_identifier: &Arc<str>,
|
2023-09-02 23:30:45 +00:00
|
|
|
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
2023-09-03 00:13:00 +00:00
|
|
|
use schema::variants::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let res = diesel::insert_into(variants)
|
|
|
|
.values((
|
|
|
|
hash.eq(&input_hash),
|
|
|
|
variant.eq(&input_variant),
|
|
|
|
identifier.eq(input_identifier.as_ref()),
|
|
|
|
))
|
|
|
|
.execute(&mut conn)
|
|
|
|
.await;
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(_) => Ok(Ok(())),
|
|
|
|
Err(diesel::result::Error::DatabaseError(
|
|
|
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
|
|
|
_,
|
|
|
|
)) => Ok(Err(VariantAlreadyExists)),
|
|
|
|
Err(e) => Err(PostgresError::Diesel(e).into()),
|
|
|
|
}
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn variant_identifier(
|
|
|
|
&self,
|
2023-09-03 00:13:00 +00:00
|
|
|
input_hash: Hash,
|
|
|
|
input_variant: String,
|
2023-09-02 23:30:45 +00:00
|
|
|
) -> Result<Option<Arc<str>>, RepoError> {
|
2023-09-03 00:13:00 +00:00
|
|
|
use schema::variants::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let opt = variants
|
|
|
|
.select(identifier)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.filter(variant.eq(&input_variant))
|
|
|
|
.get_result::<String>(&mut conn)
|
|
|
|
.await
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?
|
|
|
|
.map(Arc::from);
|
|
|
|
|
|
|
|
Ok(opt)
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
|
|
|
use schema::variants::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let vec = variants
|
|
|
|
.select((variant, identifier))
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.get_results::<(String, String)>(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?
|
|
|
|
.into_iter()
|
|
|
|
.map(|(s, i)| (s, Arc::from(i)))
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Ok(vec)
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn remove_variant(
|
|
|
|
&self,
|
|
|
|
input_hash: Hash,
|
|
|
|
input_variant: String,
|
|
|
|
) -> Result<(), RepoError> {
|
|
|
|
use schema::variants::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
diesel::delete(variants)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.filter(variant.eq(&input_variant))
|
|
|
|
.execute(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(())
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn relate_motion_identifier(
|
|
|
|
&self,
|
2023-09-03 00:13:00 +00:00
|
|
|
input_hash: Hash,
|
|
|
|
input_identifier: &Arc<str>,
|
2023-09-02 23:30:45 +00:00
|
|
|
) -> Result<(), RepoError> {
|
2023-09-03 00:13:00 +00:00
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
diesel::update(hashes)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.set(motion_identifier.eq(input_identifier.as_ref()))
|
|
|
|
.execute(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(())
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
|
|
|
use schema::hashes::dsl::*;
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
let opt = hashes
|
|
|
|
.select(motion_identifier)
|
|
|
|
.filter(hash.eq(&input_hash))
|
|
|
|
.get_result::<Option<String>>(&mut conn)
|
|
|
|
.await
|
|
|
|
.optional()
|
|
|
|
.map_err(PostgresError::Diesel)?
|
|
|
|
.flatten()
|
|
|
|
.map(Arc::from);
|
|
|
|
|
|
|
|
Ok(opt)
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
|
|
|
|
2023-09-03 00:13:00 +00:00
|
|
|
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
|
|
|
let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
|
|
|
|
conn.transaction(|conn| {
|
|
|
|
Box::pin(async move {
|
|
|
|
diesel::delete(schema::hashes::dsl::hashes)
|
|
|
|
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
|
|
|
.execute(conn)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
diesel::delete(schema::variants::dsl::variants)
|
|
|
|
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
|
|
|
.execute(conn)
|
|
|
|
.await
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
|
|
|
|
Ok(())
|
2023-09-02 23:30:45 +00:00
|
|
|
}
|
2023-09-02 16:52:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl std::fmt::Debug for PostgresRepo {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("PostgresRepo")
|
|
|
|
.field("pool", &"pool")
|
|
|
|
.finish()
|
|
|
|
}
|
|
|
|
}
|