mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2025-01-25 10:46:13 +00:00
1850 lines
58 KiB
Rust
1850 lines
58 KiB
Rust
mod embedded;
|
|
mod job_status;
|
|
mod schema;
|
|
|
|
use std::{
|
|
collections::{BTreeSet, VecDeque},
|
|
sync::{
|
|
atomic::{AtomicU64, Ordering},
|
|
Arc, Weak,
|
|
},
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use dashmap::DashMap;
|
|
use deadpool::managed::Hook;
|
|
use diesel::prelude::*;
|
|
use diesel_async::{
|
|
pooled_connection::{
|
|
deadpool::{BuildError, Object, Pool, PoolError},
|
|
AsyncDieselConnectionManager, ManagerConfig,
|
|
},
|
|
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
|
};
|
|
use tokio::sync::Notify;
|
|
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
|
|
use tracing::Instrument;
|
|
use url::Url;
|
|
use uuid::Uuid;
|
|
|
|
use crate::{
|
|
details::Details,
|
|
error_code::{ErrorCode, OwnedErrorCode},
|
|
future::{LocalBoxFuture, WithMetrics, WithTimeout},
|
|
serde_str::Serde,
|
|
stream::LocalBoxStream,
|
|
};
|
|
|
|
use self::job_status::JobStatus;
|
|
|
|
use super::{
|
|
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
|
|
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
|
|
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo,
|
|
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
|
|
VariantAccessRepo, VariantAlreadyExists,
|
|
};
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct PostgresRepo {
|
|
inner: Arc<Inner>,
|
|
#[allow(dead_code)]
|
|
notifications: Arc<actix_rt::task::JoinHandle<()>>,
|
|
}
|
|
|
|
struct Inner {
|
|
health_count: AtomicU64,
|
|
pool: Pool<AsyncPgConnection>,
|
|
queue_notifications: DashMap<String, Arc<Notify>>,
|
|
upload_notifications: DashMap<UploadId, Weak<Notify>>,
|
|
}
|
|
|
|
struct UploadInterest {
|
|
inner: Arc<Inner>,
|
|
interest: Option<Arc<Notify>>,
|
|
upload_id: UploadId,
|
|
}
|
|
|
|
struct JobNotifierState<'a> {
|
|
inner: &'a Inner,
|
|
capacity: usize,
|
|
jobs: BTreeSet<JobId>,
|
|
jobs_ordered: VecDeque<JobId>,
|
|
}
|
|
|
|
struct UploadNotifierState<'a> {
|
|
inner: &'a Inner,
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub(crate) enum ConnectPostgresError {
|
|
#[error("Failed to connect to postgres for migrations")]
|
|
ConnectForMigration(#[source] tokio_postgres::Error),
|
|
|
|
#[error("Failed to run migrations")]
|
|
Migration(#[source] refinery::Error),
|
|
|
|
#[error("Failed to build postgres connection pool")]
|
|
BuildPool(#[source] BuildError),
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub(crate) enum PostgresError {
|
|
#[error("Error in db pool")]
|
|
Pool(#[source] PoolError),
|
|
|
|
#[error("Error in database")]
|
|
Diesel(#[source] diesel::result::Error),
|
|
|
|
#[error("Error deserializing hex value")]
|
|
Hex(#[source] hex::FromHexError),
|
|
|
|
#[error("Error serializing details")]
|
|
SerializeDetails(#[source] serde_json::Error),
|
|
|
|
#[error("Error deserializing details")]
|
|
DeserializeDetails(#[source] serde_json::Error),
|
|
|
|
#[error("Error serializing upload result")]
|
|
SerializeUploadResult(#[source] serde_json::Error),
|
|
|
|
#[error("Error deserializing upload result")]
|
|
DeserializeUploadResult(#[source] serde_json::Error),
|
|
|
|
#[error("Timed out waiting for postgres")]
|
|
DbTimeout,
|
|
}
|
|
|
|
impl PostgresError {
|
|
pub(super) const fn error_code(&self) -> ErrorCode {
|
|
match self {
|
|
Self::Pool(_)
|
|
| Self::Diesel(_)
|
|
| Self::SerializeDetails(_)
|
|
| Self::SerializeUploadResult(_)
|
|
| Self::Hex(_)
|
|
| Self::DbTimeout => ErrorCode::POSTGRES_ERROR,
|
|
Self::DeserializeDetails(_) => ErrorCode::EXTRACT_DETAILS,
|
|
Self::DeserializeUploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT,
|
|
}
|
|
}
|
|
|
|
pub(super) const fn is_disconnected(&self) -> bool {
|
|
match self {
|
|
Self::Pool(
|
|
PoolError::Closed
|
|
| PoolError::Backend(diesel_async::pooled_connection::PoolError::ConnectionError(_)),
|
|
)
|
|
| Self::Diesel(diesel::result::Error::DatabaseError(
|
|
diesel::result::DatabaseErrorKind::ClosedConnection,
|
|
_,
|
|
)) => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl PostgresRepo {
|
|
pub(crate) async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> {
|
|
let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls)
|
|
.await
|
|
.map_err(ConnectPostgresError::ConnectForMigration)?;
|
|
|
|
let handle = crate::sync::spawn(conn);
|
|
|
|
embedded::migrations::runner()
|
|
.run_async(&mut client)
|
|
.await
|
|
.map_err(ConnectPostgresError::Migration)?;
|
|
|
|
handle.abort();
|
|
let _ = handle.await;
|
|
|
|
let parallelism = std::thread::available_parallelism()
|
|
.map(|u| u.into())
|
|
.unwrap_or(1_usize);
|
|
|
|
let (tx, rx) = flume::bounded(10);
|
|
|
|
let mut config = ManagerConfig::default();
|
|
config.custom_setup = build_handler(tx);
|
|
|
|
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
|
|
postgres_url,
|
|
config,
|
|
);
|
|
|
|
let pool = Pool::builder(mgr)
|
|
.runtime(deadpool::Runtime::Tokio1)
|
|
.wait_timeout(Some(Duration::from_secs(1)))
|
|
.create_timeout(Some(Duration::from_secs(2)))
|
|
.recycle_timeout(Some(Duration::from_secs(2)))
|
|
.post_create(Hook::sync_fn(|_, _| {
|
|
metrics::increment_counter!("pict-rs.postgres.pool.connection.create");
|
|
Ok(())
|
|
}))
|
|
.post_recycle(Hook::sync_fn(|_, _| {
|
|
metrics::increment_counter!("pict-rs.postgres.pool.connection.recycle");
|
|
Ok(())
|
|
}))
|
|
.max_size(parallelism * 8)
|
|
.build()
|
|
.map_err(ConnectPostgresError::BuildPool)?;
|
|
|
|
let inner = Arc::new(Inner {
|
|
health_count: AtomicU64::new(0),
|
|
pool,
|
|
queue_notifications: DashMap::new(),
|
|
upload_notifications: DashMap::new(),
|
|
});
|
|
|
|
let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone(), parallelism * 8));
|
|
|
|
let notifications = Arc::new(handle);
|
|
|
|
Ok(PostgresRepo {
|
|
inner,
|
|
notifications,
|
|
})
|
|
}
|
|
|
|
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
|
self.inner.get_connection().await
|
|
}
|
|
}
|
|
|
|
struct GetConnectionMetricsGuard {
|
|
start: Instant,
|
|
armed: bool,
|
|
}
|
|
|
|
impl GetConnectionMetricsGuard {
|
|
fn guard() -> Self {
|
|
GetConnectionMetricsGuard {
|
|
start: Instant::now(),
|
|
armed: true,
|
|
}
|
|
}
|
|
|
|
fn disarm(mut self) {
|
|
self.armed = false;
|
|
}
|
|
}
|
|
|
|
impl Drop for GetConnectionMetricsGuard {
|
|
fn drop(&mut self) {
|
|
metrics::increment_counter!("pict-rs.postgres.pool.get", "completed" => (!self.armed).to_string());
|
|
metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
|
|
}
|
|
}
|
|
|
|
impl Inner {
|
|
#[tracing::instrument(level = "TRACE", skip(self))]
|
|
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
|
let guard = GetConnectionMetricsGuard::guard();
|
|
|
|
let obj = self.pool.get().await.map_err(PostgresError::Pool)?;
|
|
|
|
guard.disarm();
|
|
|
|
Ok(obj)
|
|
}
|
|
|
|
fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest {
|
|
let notify = crate::sync::notify();
|
|
|
|
self.upload_notifications
|
|
.insert(upload_id, Arc::downgrade(¬ify));
|
|
|
|
UploadInterest {
|
|
inner: self.clone(),
|
|
interest: Some(notify),
|
|
upload_id,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl UploadInterest {
|
|
async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> {
|
|
actix_rt::time::timeout(
|
|
timeout,
|
|
self.interest.as_ref().expect("interest exists").notified(),
|
|
)
|
|
.await
|
|
}
|
|
}
|
|
|
|
impl Drop for UploadInterest {
|
|
fn drop(&mut self) {
|
|
if let Some(interest) = self.interest.take() {
|
|
if Arc::into_inner(interest).is_some() {
|
|
self.inner.upload_notifications.remove(&self.upload_id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a> JobNotifierState<'a> {
|
|
fn handle(&mut self, payload: &str) {
|
|
let Some((job_id, queue_name)) = payload.split_once(' ') else {
|
|
tracing::warn!("Invalid queue payload {payload}");
|
|
return;
|
|
};
|
|
|
|
let Ok(job_id) = job_id.parse::<Uuid>().map(JobId) else {
|
|
tracing::warn!("Invalid job ID {job_id}");
|
|
return;
|
|
};
|
|
|
|
if !self.jobs.insert(job_id) {
|
|
// duplicate job
|
|
return;
|
|
}
|
|
|
|
self.jobs_ordered.push_back(job_id);
|
|
|
|
if self.jobs_ordered.len() > self.capacity {
|
|
if let Some(job_id) = self.jobs_ordered.pop_front() {
|
|
self.jobs.remove(&job_id);
|
|
}
|
|
}
|
|
|
|
self.inner
|
|
.queue_notifications
|
|
.entry(queue_name.to_string())
|
|
.or_insert_with(crate::sync::notify)
|
|
.notify_one();
|
|
|
|
metrics::increment_counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string());
|
|
}
|
|
}
|
|
|
|
impl<'a> UploadNotifierState<'a> {
|
|
fn handle(&self, payload: &str) {
|
|
let Ok(upload_id) = payload.parse::<UploadId>() else {
|
|
tracing::warn!("Invalid upload id {payload}");
|
|
return;
|
|
};
|
|
|
|
if let Some(notifier) = self
|
|
.inner
|
|
.upload_notifications
|
|
.get(&upload_id)
|
|
.and_then(|weak| weak.upgrade())
|
|
{
|
|
notifier.notify_waiters();
|
|
metrics::increment_counter!("pict-rs.postgres.upload-notifier.notified");
|
|
}
|
|
}
|
|
}
|
|
|
|
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
|
|
type ConfigFn =
|
|
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
|
|
|
|
async fn delegate_notifications(
|
|
receiver: flume::Receiver<Notification>,
|
|
inner: Arc<Inner>,
|
|
capacity: usize,
|
|
) {
|
|
let mut job_notifier_state = JobNotifierState {
|
|
inner: &inner,
|
|
capacity,
|
|
jobs: BTreeSet::new(),
|
|
jobs_ordered: VecDeque::new(),
|
|
};
|
|
|
|
let upload_notifier_state = UploadNotifierState { inner: &inner };
|
|
|
|
while let Ok(notification) = receiver.recv_async().await {
|
|
metrics::increment_counter!("pict-rs.postgres.notification");
|
|
|
|
match notification.channel() {
|
|
"queue_status_channel" => {
|
|
// new job inserted for queue
|
|
job_notifier_state.handle(notification.payload());
|
|
}
|
|
"upload_completion_channel" => {
|
|
// new upload finished
|
|
upload_notifier_state.handle(notification.payload());
|
|
}
|
|
channel => {
|
|
tracing::info!(
|
|
"Unhandled postgres notification: {channel}: {}",
|
|
notification.payload()
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
tracing::warn!("Notification delegator shutting down");
|
|
}
|
|
|
|
fn build_handler(sender: flume::Sender<Notification>) -> ConfigFn {
|
|
Box::new(
|
|
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
|
|
let sender = sender.clone();
|
|
|
|
let connect_span = tracing::trace_span!(parent: None, "connect future");
|
|
|
|
Box::pin(
|
|
async move {
|
|
let (client, conn) =
|
|
tokio_postgres::connect(config, tokio_postgres::tls::NoTls)
|
|
.await
|
|
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
|
|
|
|
// not very cash money (structured concurrency) of me
|
|
spawn_db_notification_task(sender, conn);
|
|
|
|
AsyncPgConnection::try_from(client).await
|
|
}
|
|
.instrument(connect_span),
|
|
)
|
|
},
|
|
)
|
|
}
|
|
|
|
fn spawn_db_notification_task(
|
|
sender: flume::Sender<Notification>,
|
|
mut conn: Connection<Socket, NoTlsStream>,
|
|
) {
|
|
crate::sync::spawn(async move {
|
|
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
|
|
match res {
|
|
Err(e) => {
|
|
tracing::error!("Database Connection {e:?}");
|
|
return;
|
|
}
|
|
Ok(AsyncMessage::Notice(e)) => {
|
|
tracing::warn!("Database Notice {e:?}");
|
|
}
|
|
Ok(AsyncMessage::Notification(notification)) => {
|
|
if sender.send_async(notification).await.is_err() {
|
|
tracing::warn!("Missed notification. Are we shutting down?");
|
|
}
|
|
}
|
|
Ok(_) => {
|
|
tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime {
|
|
let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
|
|
time::PrimitiveDateTime::new(timestamp.date(), timestamp.time())
|
|
}
|
|
|
|
impl BaseRepo for PostgresRepo {}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl HashRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn size(&self) -> Result<u64, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let count = hashes
|
|
.count()
|
|
.get_result::<i64>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.count")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(count.try_into().expect("non-negative count"))
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = hashes
|
|
.select(created_at)
|
|
.filter(hash.eq(&input_hash))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.bound")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map(time::PrimitiveDateTime::assume_utc)
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(timestamp.map(|timestamp| OrderedHash {
|
|
timestamp,
|
|
hash: input_hash,
|
|
}))
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn hash_page_by_date(
|
|
&self,
|
|
date: time::OffsetDateTime,
|
|
limit: usize,
|
|
) -> Result<HashPage, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = to_primitive(date);
|
|
|
|
let ordered_hash = hashes
|
|
.select((created_at, hash))
|
|
.filter(created_at.lt(timestamp))
|
|
.order(created_at.desc())
|
|
.get_result::<(time::PrimitiveDateTime, Hash)>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.ordered-hash")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(|tup| OrderedHash {
|
|
timestamp: tup.0.assume_utc(),
|
|
hash: tup.1,
|
|
});
|
|
|
|
self.hashes_ordered(ordered_hash, limit).await
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn hashes_ordered(
|
|
&self,
|
|
bound: Option<OrderedHash>,
|
|
limit: usize,
|
|
) -> Result<HashPage, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let (mut page, prev) = if let Some(OrderedHash {
|
|
timestamp,
|
|
hash: bound_hash,
|
|
}) = bound
|
|
{
|
|
let timestamp = to_primitive(timestamp);
|
|
|
|
let page = hashes
|
|
.select(hash)
|
|
.filter(created_at.lt(timestamp))
|
|
.or_filter(created_at.eq(timestamp).and(hash.le(&bound_hash)))
|
|
.order(created_at.desc())
|
|
.then_order_by(hash.desc())
|
|
.limit(limit as i64 + 1)
|
|
.get_results::<Hash>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.next-hashes")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
let prev = hashes
|
|
.select(hash)
|
|
.filter(created_at.gt(timestamp))
|
|
.or_filter(created_at.eq(timestamp).and(hash.gt(&bound_hash)))
|
|
.order(created_at)
|
|
.then_order_by(hash)
|
|
.limit(limit as i64)
|
|
.get_results::<Hash>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.prev-hashes")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?
|
|
.pop();
|
|
|
|
(page, prev)
|
|
} else {
|
|
let page = hashes
|
|
.select(hash)
|
|
.order(created_at.desc())
|
|
.then_order_by(hash.desc())
|
|
.limit(limit as i64 + 1)
|
|
.get_results::<Hash>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.first-hashes")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
(page, None)
|
|
};
|
|
|
|
let next = if page.len() > limit { page.pop() } else { None };
|
|
|
|
Ok(HashPage {
|
|
limit,
|
|
prev,
|
|
next,
|
|
hashes: page,
|
|
})
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn create_hash_with_timestamp(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_identifier: &Arc<str>,
|
|
timestamp: time::OffsetDateTime,
|
|
) -> Result<Result<(), HashAlreadyExists>, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = to_primitive(timestamp);
|
|
|
|
let res = diesel::insert_into(hashes)
|
|
.values((
|
|
hash.eq(&input_hash),
|
|
identifier.eq(input_identifier.as_ref()),
|
|
created_at.eq(×tamp),
|
|
))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.create-hash")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?;
|
|
|
|
match res {
|
|
Ok(_) => Ok(Ok(())),
|
|
Err(diesel::result::Error::DatabaseError(
|
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
|
_,
|
|
)) => Ok(Err(HashAlreadyExists)),
|
|
Err(e) => Err(PostgresError::Diesel(e).into()),
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn update_identifier(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_identifier: &Arc<str>,
|
|
) -> Result<(), RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::update(hashes)
|
|
.filter(hash.eq(&input_hash))
|
|
.set(identifier.eq(input_identifier.as_ref()))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.update-identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = hashes
|
|
.select(identifier)
|
|
.filter(hash.eq(&input_hash))
|
|
.get_result::<String>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(opt.map(Arc::from))
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn relate_variant_identifier(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_variant: String,
|
|
input_identifier: &Arc<str>,
|
|
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let res = diesel::insert_into(variants)
|
|
.values((
|
|
hash.eq(&input_hash),
|
|
variant.eq(&input_variant),
|
|
identifier.eq(input_identifier.as_ref()),
|
|
))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variants.relate-variant-identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?;
|
|
|
|
match res {
|
|
Ok(_) => Ok(Ok(())),
|
|
Err(diesel::result::Error::DatabaseError(
|
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
|
_,
|
|
)) => Ok(Err(VariantAlreadyExists)),
|
|
Err(e) => Err(PostgresError::Diesel(e).into()),
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn variant_identifier(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_variant: String,
|
|
) -> Result<Option<Arc<str>>, RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = variants
|
|
.select(identifier)
|
|
.filter(hash.eq(&input_hash))
|
|
.filter(variant.eq(&input_variant))
|
|
.get_result::<String>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variants.identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(Arc::from);
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let vec = variants
|
|
.select((variant, identifier))
|
|
.filter(hash.eq(&input_hash))
|
|
.get_results::<(String, String)>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variants.for-hash")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?
|
|
.into_iter()
|
|
.map(|(s, i)| (s, Arc::from(i)))
|
|
.collect();
|
|
|
|
Ok(vec)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn remove_variant(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_variant: String,
|
|
) -> Result<(), RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(variants)
|
|
.filter(hash.eq(&input_hash))
|
|
.filter(variant.eq(&input_variant))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variants.remove")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn relate_motion_identifier(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_identifier: &Arc<str>,
|
|
) -> Result<(), RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::update(hashes)
|
|
.filter(hash.eq(&input_hash))
|
|
.set(motion_identifier.eq(input_identifier.as_ref()))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.relate-motion-identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
|
use schema::hashes::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = hashes
|
|
.select(motion_identifier)
|
|
.filter(hash.eq(&input_hash))
|
|
.get_result::<Option<String>>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.hashes.motion-identifier")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.flatten()
|
|
.map(Arc::from);
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
conn.transaction(|conn| {
|
|
Box::pin(async move {
|
|
diesel::delete(schema::variants::dsl::variants)
|
|
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
|
.execute(conn)
|
|
.with_metrics("pict-rs.postgres.variants.cleanup")
|
|
.await?;
|
|
|
|
diesel::delete(schema::hashes::dsl::hashes)
|
|
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
|
.execute(conn)
|
|
.with_metrics("pict-rs.postgres.hashes.cleanup")
|
|
.await
|
|
})
|
|
})
|
|
.await
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl AliasRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn create_alias(
|
|
&self,
|
|
input_alias: &Alias,
|
|
delete_token: &DeleteToken,
|
|
input_hash: Hash,
|
|
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
|
|
use schema::aliases::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let res = diesel::insert_into(aliases)
|
|
.values((
|
|
alias.eq(input_alias),
|
|
hash.eq(&input_hash),
|
|
token.eq(delete_token),
|
|
))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.aliases.create")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?;
|
|
|
|
match res {
|
|
Ok(_) => Ok(Ok(())),
|
|
Err(diesel::result::Error::DatabaseError(
|
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
|
_,
|
|
)) => Ok(Err(AliasAlreadyExists)),
|
|
Err(e) => Err(PostgresError::Diesel(e).into()),
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn delete_token(&self, input_alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
|
use schema::aliases::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = aliases
|
|
.select(token)
|
|
.filter(alias.eq(input_alias))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.aliases.delete-token")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn hash(&self, input_alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
|
use schema::aliases::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = aliases
|
|
.select(hash)
|
|
.filter(alias.eq(input_alias))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.aliases.hash")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn aliases_for_hash(&self, input_hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
|
use schema::aliases::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let vec = aliases
|
|
.select(alias)
|
|
.filter(hash.eq(&input_hash))
|
|
.get_results(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.aliases.for-hash")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(vec)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> {
|
|
use schema::aliases::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(aliases)
|
|
.filter(alias.eq(input_alias))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.aliases.cleanup")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl SettingsRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self, input_value))]
|
|
async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> {
|
|
use schema::settings::dsl::*;
|
|
|
|
let input_value = hex::encode(input_value);
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::insert_into(settings)
|
|
.values((key.eq(input_key), value.eq(&input_value)))
|
|
.on_conflict(key)
|
|
.do_update()
|
|
.set(value.eq(&input_value))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.settings.set")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn get(&self, input_key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
|
use schema::settings::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = settings
|
|
.select(value)
|
|
.filter(key.eq(input_key))
|
|
.get_result::<String>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.settings.get")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(hex::decode)
|
|
.transpose()
|
|
.map_err(PostgresError::Hex)?
|
|
.map(Arc::from);
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
|
use schema::settings::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(settings)
|
|
.filter(key.eq(input_key))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.settings.remove")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl DetailsRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self, input_details))]
|
|
async fn relate_details(
|
|
&self,
|
|
input_identifier: &Arc<str>,
|
|
input_details: &Details,
|
|
) -> Result<(), RepoError> {
|
|
use schema::details::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let value =
|
|
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
|
|
|
|
diesel::insert_into(details)
|
|
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.details.relate")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn details(&self, input_identifier: &Arc<str>) -> Result<Option<Details>, RepoError> {
|
|
use schema::details::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = details
|
|
.select(json)
|
|
.filter(identifier.eq(input_identifier.as_ref()))
|
|
.get_result::<serde_json::Value>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.details.get")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(serde_json::from_value)
|
|
.transpose()
|
|
.map_err(PostgresError::DeserializeDetails)?
|
|
.map(|inner| Details { inner });
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn cleanup_details(&self, input_identifier: &Arc<str>) -> Result<(), RepoError> {
|
|
use schema::details::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(details)
|
|
.filter(identifier.eq(input_identifier.as_ref()))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.details.cleanup")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl QueueRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self, job_json))]
|
|
async fn push(
|
|
&self,
|
|
queue_name: &'static str,
|
|
job_json: serde_json::Value,
|
|
) -> Result<JobId, RepoError> {
|
|
let guard = PushMetricsGuard::guard(queue_name);
|
|
|
|
use schema::job_queue::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let job_id = diesel::insert_into(job_queue)
|
|
.values((queue.eq(queue_name), job.eq(job_json)))
|
|
.returning(id)
|
|
.get_result::<Uuid>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.push")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
guard.disarm();
|
|
|
|
Ok(JobId(job_id))
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn pop(
|
|
&self,
|
|
queue_name: &'static str,
|
|
worker_id: Uuid,
|
|
) -> Result<(JobId, serde_json::Value), RepoError> {
|
|
let guard = PopMetricsGuard::guard(queue_name);
|
|
|
|
use schema::job_queue::dsl::*;
|
|
|
|
loop {
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let notifier: Arc<Notify> = self
|
|
.inner
|
|
.queue_notifications
|
|
.entry(String::from(queue_name))
|
|
.or_insert_with(crate::sync::notify)
|
|
.clone();
|
|
|
|
diesel::sql_query("LISTEN queue_status_channel;")
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.listen")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
|
|
|
let count = diesel::update(job_queue)
|
|
.filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2))))
|
|
.set((
|
|
heartbeat.eq(Option::<time::PrimitiveDateTime>::None),
|
|
status.eq(JobStatus::New),
|
|
))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.requeue")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
if count > 0 {
|
|
tracing::info!("Reset {count} jobs");
|
|
}
|
|
|
|
// TODO: combine into 1 query
|
|
let opt = loop {
|
|
let id_opt = job_queue
|
|
.select(id)
|
|
.filter(status.eq(JobStatus::New).and(queue.eq(queue_name)))
|
|
.order(queue_time)
|
|
.limit(1)
|
|
.get_result::<Uuid>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.select")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
let Some(id_val) = id_opt else {
|
|
break None;
|
|
};
|
|
|
|
let opt = diesel::update(job_queue)
|
|
.filter(id.eq(id_val))
|
|
.filter(status.eq(JobStatus::New))
|
|
.set((
|
|
heartbeat.eq(timestamp),
|
|
status.eq(JobStatus::Running),
|
|
worker.eq(worker_id),
|
|
))
|
|
.returning((id, job))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.claim")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
if let Some(tup) = opt {
|
|
break Some(tup);
|
|
}
|
|
};
|
|
|
|
if let Some((job_id, job_json)) = opt {
|
|
guard.disarm();
|
|
return Ok((JobId(job_id), job_json));
|
|
}
|
|
|
|
drop(conn);
|
|
if actix_rt::time::timeout(Duration::from_secs(5), notifier.notified())
|
|
.await
|
|
.is_ok()
|
|
{
|
|
tracing::debug!("Notified");
|
|
} else {
|
|
tracing::debug!("Timed out");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn heartbeat(
|
|
&self,
|
|
queue_name: &'static str,
|
|
worker_id: Uuid,
|
|
job_id: JobId,
|
|
) -> Result<(), RepoError> {
|
|
use schema::job_queue::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
|
|
|
diesel::update(job_queue)
|
|
.filter(
|
|
id.eq(job_id.0)
|
|
.and(queue.eq(queue_name))
|
|
.and(worker.eq(worker_id)),
|
|
)
|
|
.set(heartbeat.eq(timestamp))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.heartbeat")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn complete_job(
|
|
&self,
|
|
queue_name: &'static str,
|
|
worker_id: Uuid,
|
|
job_id: JobId,
|
|
) -> Result<(), RepoError> {
|
|
use schema::job_queue::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(job_queue)
|
|
.filter(
|
|
id.eq(job_id.0)
|
|
.and(queue.eq(queue_name))
|
|
.and(worker.eq(worker_id)),
|
|
)
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.queue.complete")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl StoreMigrationRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
|
use schema::store_migrations::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let count = store_migrations
|
|
.count()
|
|
.get_result::<i64>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.store-migration.count")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(count > 0)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn mark_migrated(
|
|
&self,
|
|
input_old_identifier: &Arc<str>,
|
|
input_new_identifier: &Arc<str>,
|
|
) -> Result<(), RepoError> {
|
|
use schema::store_migrations::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::insert_into(store_migrations)
|
|
.values((
|
|
old_identifier.eq(input_old_identifier.as_ref()),
|
|
new_identifier.eq(input_new_identifier.as_ref()),
|
|
))
|
|
.on_conflict((old_identifier, new_identifier))
|
|
.do_nothing()
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.store-migration.mark-migrated")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn is_migrated(&self, input_old_identifier: &Arc<str>) -> Result<bool, RepoError> {
|
|
use schema::store_migrations::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let b = diesel::select(diesel::dsl::exists(
|
|
store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())),
|
|
))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.store-migration.is-migrated")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(b)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn clear(&self) -> Result<(), RepoError> {
|
|
use schema::store_migrations::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(store_migrations)
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(20))
|
|
.with_metrics("pict-rs.postgres.store-migration.clear")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl ProxyRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::insert_into(proxies)
|
|
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.proxy.relate-url")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = proxies
|
|
.select(alias)
|
|
.filter(url.eq(input_url.as_str()))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.proxy.related")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(proxies)
|
|
.filter(alias.eq(&input_alias))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.proxy.remove-relation")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl AliasAccessRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn set_accessed_alias(
|
|
&self,
|
|
input_alias: Alias,
|
|
timestamp: time::OffsetDateTime,
|
|
) -> Result<(), RepoError> {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = to_primitive(timestamp);
|
|
|
|
diesel::update(proxies)
|
|
.filter(alias.eq(&input_alias))
|
|
.set(accessed.eq(timestamp))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.alias-access.set-accessed")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn alias_accessed_at(
|
|
&self,
|
|
input_alias: Alias,
|
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = proxies
|
|
.select(accessed)
|
|
.filter(alias.eq(&input_alias))
|
|
.get_result::<time::PrimitiveDateTime>(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.alias-access.accessed-at")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(time::PrimitiveDateTime::assume_utc);
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn older_aliases(
|
|
&self,
|
|
timestamp: time::OffsetDateTime,
|
|
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
|
|
Ok(Box::pin(PageStream {
|
|
inner: self.inner.clone(),
|
|
future: None,
|
|
current: Vec::new(),
|
|
older_than: to_primitive(timestamp),
|
|
next: Box::new(|inner, older_than| {
|
|
Box::pin(async move {
|
|
use schema::proxies::dsl::*;
|
|
|
|
let mut conn = inner.get_connection().await?;
|
|
|
|
let vec = proxies
|
|
.select((accessed, alias))
|
|
.filter(accessed.lt(older_than))
|
|
.order(accessed.desc())
|
|
.limit(100)
|
|
.get_results(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.alias-access.older-aliases")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(vec)
|
|
})
|
|
}),
|
|
}))
|
|
}
|
|
|
|
async fn remove_alias_access(&self, _: Alias) -> Result<(), RepoError> {
|
|
// Noop - handled by ProxyRepo::remove_relation
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl VariantAccessRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn set_accessed_variant(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_variant: String,
|
|
input_accessed: time::OffsetDateTime,
|
|
) -> Result<(), RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let timestamp = to_primitive(input_accessed);
|
|
|
|
diesel::update(variants)
|
|
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
|
.set(accessed.eq(timestamp))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variant-access.set-accessed")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn variant_accessed_at(
|
|
&self,
|
|
input_hash: Hash,
|
|
input_variant: String,
|
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let opt = variants
|
|
.select(accessed)
|
|
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variant-access.accessed-at")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?
|
|
.map(time::PrimitiveDateTime::assume_utc);
|
|
|
|
Ok(opt)
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn older_variants(
|
|
&self,
|
|
timestamp: time::OffsetDateTime,
|
|
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
|
|
Ok(Box::pin(PageStream {
|
|
inner: self.inner.clone(),
|
|
future: None,
|
|
current: Vec::new(),
|
|
older_than: to_primitive(timestamp),
|
|
next: Box::new(|inner, older_than| {
|
|
Box::pin(async move {
|
|
use schema::variants::dsl::*;
|
|
|
|
let mut conn = inner.get_connection().await?;
|
|
|
|
let vec = variants
|
|
.select((accessed, (hash, variant)))
|
|
.filter(accessed.lt(older_than))
|
|
.order(accessed.desc())
|
|
.limit(100)
|
|
.get_results(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.variant-access.older-variants")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(vec)
|
|
})
|
|
}),
|
|
}))
|
|
}
|
|
|
|
async fn remove_variant_access(&self, _: Hash, _: String) -> Result<(), RepoError> {
|
|
// Noop - handled by HashRepo::remove_variant
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(serde::Deserialize, serde::Serialize)]
|
|
enum InnerUploadResult {
|
|
Success {
|
|
alias: Serde<Alias>,
|
|
token: Serde<DeleteToken>,
|
|
},
|
|
Failure {
|
|
message: String,
|
|
code: OwnedErrorCode,
|
|
},
|
|
}
|
|
|
|
impl From<UploadResult> for InnerUploadResult {
|
|
fn from(u: UploadResult) -> Self {
|
|
match u {
|
|
UploadResult::Success { alias, token } => InnerUploadResult::Success {
|
|
alias: Serde::new(alias),
|
|
token: Serde::new(token),
|
|
},
|
|
UploadResult::Failure { message, code } => InnerUploadResult::Failure { message, code },
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<InnerUploadResult> for UploadResult {
|
|
fn from(i: InnerUploadResult) -> Self {
|
|
match i {
|
|
InnerUploadResult::Success { alias, token } => UploadResult::Success {
|
|
alias: Serde::into_inner(alias),
|
|
token: Serde::into_inner(token),
|
|
},
|
|
InnerUploadResult::Failure { message, code } => UploadResult::Failure { message, code },
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl UploadRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn create_upload(&self) -> Result<UploadId, RepoError> {
|
|
use schema::uploads::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let uuid = diesel::insert_into(uploads)
|
|
.default_values()
|
|
.returning(id)
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.uploads.create")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(UploadId { id: uuid })
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
|
|
let guard = WaitMetricsGuard::guard();
|
|
use schema::uploads::dsl::*;
|
|
|
|
let interest = self.inner.interest(upload_id);
|
|
|
|
loop {
|
|
let interest_future = interest.notified_timeout(Duration::from_secs(5));
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::sql_query("LISTEN upload_completion_channel;")
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.uploads.listen")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
let nested_opt = uploads
|
|
.select(result)
|
|
.filter(id.eq(upload_id.id))
|
|
.get_result(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.uploads.wait")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.optional()
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
match nested_opt {
|
|
Some(opt) => {
|
|
if let Some(upload_result) = opt {
|
|
let upload_result: InnerUploadResult =
|
|
serde_json::from_value(upload_result)
|
|
.map_err(PostgresError::DeserializeUploadResult)?;
|
|
|
|
guard.disarm();
|
|
return Ok(upload_result.into());
|
|
}
|
|
}
|
|
None => {
|
|
return Err(RepoError::AlreadyClaimed);
|
|
}
|
|
}
|
|
|
|
drop(conn);
|
|
|
|
if interest_future.await.is_ok() {
|
|
tracing::debug!("Notified");
|
|
} else {
|
|
tracing::debug!("Timed out");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
|
use schema::uploads::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
diesel::delete(uploads)
|
|
.filter(id.eq(upload_id.id))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.uploads.claim")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn complete_upload(
|
|
&self,
|
|
upload_id: UploadId,
|
|
upload_result: UploadResult,
|
|
) -> Result<(), RepoError> {
|
|
use schema::uploads::dsl::*;
|
|
|
|
let mut conn = self.get_connection().await?;
|
|
|
|
let upload_result: InnerUploadResult = upload_result.into();
|
|
let upload_result =
|
|
serde_json::to_value(&upload_result).map_err(PostgresError::SerializeUploadResult)?;
|
|
|
|
diesel::update(uploads)
|
|
.filter(id.eq(upload_id.id))
|
|
.set(result.eq(upload_result))
|
|
.execute(&mut conn)
|
|
.with_timeout(Duration::from_secs(5))
|
|
.with_metrics("pict-rs.postgres.uploads.complete")
|
|
.await
|
|
.map_err(|_| PostgresError::DbTimeout)?
|
|
.map_err(PostgresError::Diesel)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
impl FullRepo for PostgresRepo {
|
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
|
async fn health_check(&self) -> Result<(), RepoError> {
|
|
let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
self.set("health-value", Arc::from(next.to_be_bytes()))
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
type NextFuture<I> = LocalBoxFuture<'static, Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>;
|
|
|
|
struct PageStream<I> {
|
|
inner: Arc<Inner>,
|
|
future: Option<NextFuture<I>>,
|
|
current: Vec<I>,
|
|
older_than: time::PrimitiveDateTime,
|
|
next: Box<dyn Fn(Arc<Inner>, time::PrimitiveDateTime) -> NextFuture<I>>,
|
|
}
|
|
|
|
impl<I> futures_core::Stream for PageStream<I>
|
|
where
|
|
I: Unpin,
|
|
{
|
|
type Item = Result<I, RepoError>;
|
|
|
|
fn poll_next(
|
|
self: std::pin::Pin<&mut Self>,
|
|
cx: &mut std::task::Context<'_>,
|
|
) -> std::task::Poll<Option<Self::Item>> {
|
|
let this = self.get_mut();
|
|
|
|
loop {
|
|
// Pop because we reversed the list
|
|
if let Some(alias) = this.current.pop() {
|
|
return std::task::Poll::Ready(Some(Ok(alias)));
|
|
}
|
|
|
|
if let Some(future) = this.future.as_mut() {
|
|
let res = std::task::ready!(future.as_mut().poll(cx));
|
|
|
|
this.future.take();
|
|
|
|
match res {
|
|
Ok(page) if page.is_empty() => {
|
|
return std::task::Poll::Ready(None);
|
|
}
|
|
Ok(page) => {
|
|
let (mut timestamps, mut aliases): (Vec<_>, Vec<_>) =
|
|
page.into_iter().unzip();
|
|
// reverse because we use .pop() to get next
|
|
aliases.reverse();
|
|
|
|
this.current = aliases;
|
|
this.older_than = timestamps.pop().expect("Verified nonempty");
|
|
}
|
|
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
|
|
}
|
|
} else {
|
|
let inner = this.inner.clone();
|
|
let older_than = this.older_than;
|
|
|
|
this.future = Some((this.next)(inner, older_than));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for PostgresRepo {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("PostgresRepo")
|
|
.field("pool", &"pool")
|
|
.finish()
|
|
}
|
|
}
|