mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Instrument postgres db calls
This commit is contained in:
parent
33615672ae
commit
a4b1ab7dfb
10 changed files with 350 additions and 37 deletions
28
src/error.rs
28
src/error.rs
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use actix_web::{http::StatusCode, HttpResponse, ResponseError};
|
use actix_web::{http::StatusCode, HttpResponse, ResponseError};
|
||||||
use color_eyre::Report;
|
use color_eyre::Report;
|
||||||
|
|
||||||
|
@ -5,6 +7,8 @@ use crate::error_code::ErrorCode;
|
||||||
|
|
||||||
pub(crate) struct Error {
|
pub(crate) struct Error {
|
||||||
inner: color_eyre::Report,
|
inner: color_eyre::Report,
|
||||||
|
debug: Arc<str>,
|
||||||
|
display: Arc<str>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
@ -21,17 +25,21 @@ impl Error {
|
||||||
.map(|e| e.error_code())
|
.map(|e| e.error_code())
|
||||||
.unwrap_or(ErrorCode::UNKNOWN_ERROR)
|
.unwrap_or(ErrorCode::UNKNOWN_ERROR)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_disconnected(&self) -> bool {
|
||||||
|
self.kind().map(|e| e.is_disconnected()).unwrap_or(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for Error {
|
impl std::fmt::Debug for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
std::fmt::Debug::fmt(&self.inner, f)
|
f.write_str(&self.debug)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Error {
|
impl std::fmt::Display for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
std::fmt::Display::fmt(&self.inner, f)
|
f.write_str(&self.display)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,8 +54,14 @@ where
|
||||||
UploadError: From<T>,
|
UploadError: From<T>,
|
||||||
{
|
{
|
||||||
fn from(error: T) -> Self {
|
fn from(error: T) -> Self {
|
||||||
|
let inner = Report::from(UploadError::from(error));
|
||||||
|
let debug = Arc::from(format!("{inner:?}"));
|
||||||
|
let display = Arc::from(format!("{inner}"));
|
||||||
|
|
||||||
Error {
|
Error {
|
||||||
inner: Report::from(UploadError::from(error)),
|
inner,
|
||||||
|
debug,
|
||||||
|
display,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,6 +186,14 @@ impl UploadError {
|
||||||
Self::Timeout(_) => ErrorCode::STREAM_TOO_SLOW,
|
Self::Timeout(_) => ErrorCode::STREAM_TOO_SLOW,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const fn is_disconnected(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Repo(e) => e.is_disconnected(),
|
||||||
|
Self::Store(s) => s.is_disconnected(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<actix_web::error::BlockingError> for UploadError {
|
impl From<actix_web::error::BlockingError> for UploadError {
|
||||||
|
|
75
src/future.rs
Normal file
75
src/future.rs
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
use std::{
|
||||||
|
future::Future,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(crate) type LocalBoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + 'a>>;
|
||||||
|
|
||||||
|
pub(crate) trait WithTimeout: Future {
|
||||||
|
fn with_timeout(self, duration: Duration) -> actix_rt::time::Timeout<Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
actix_rt::time::timeout(duration, self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) trait WithMetrics: Future {
|
||||||
|
fn with_metrics(self, name: &'static str) -> MetricsFuture<Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
MetricsFuture {
|
||||||
|
future: self,
|
||||||
|
metrics: Metrics {
|
||||||
|
name,
|
||||||
|
start: Instant::now(),
|
||||||
|
complete: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> WithMetrics for F where F: Future {}
|
||||||
|
impl<F> WithTimeout for F where F: Future {}
|
||||||
|
|
||||||
|
pin_project_lite::pin_project! {
|
||||||
|
pub(crate) struct MetricsFuture<F> {
|
||||||
|
#[pin]
|
||||||
|
future: F,
|
||||||
|
|
||||||
|
metrics: Metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Metrics {
|
||||||
|
name: &'static str,
|
||||||
|
start: Instant,
|
||||||
|
complete: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> Future for MetricsFuture<F>
|
||||||
|
where
|
||||||
|
F: Future,
|
||||||
|
{
|
||||||
|
type Output = F::Output;
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Self::Output> {
|
||||||
|
let this = self.project();
|
||||||
|
|
||||||
|
let out = std::task::ready!(this.future.poll(cx));
|
||||||
|
|
||||||
|
this.metrics.complete = true;
|
||||||
|
|
||||||
|
std::task::Poll::Ready(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Metrics {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
metrics::histogram!(self.name, self.start.elapsed().as_secs_f64(), "complete" => self.complete.to_string());
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,9 +8,7 @@ use opentelemetry_otlp::WithExportConfig;
|
||||||
use tracing::subscriber::set_global_default;
|
use tracing::subscriber::set_global_default;
|
||||||
use tracing_error::ErrorLayer;
|
use tracing_error::ErrorLayer;
|
||||||
use tracing_log::LogTracer;
|
use tracing_log::LogTracer;
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{layer::SubscriberExt, registry::LookupSpan, Layer, Registry};
|
||||||
fmt::format::FmtSpan, layer::SubscriberExt, registry::LookupSpan, Layer, Registry,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
|
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
|
||||||
color_eyre::install()?;
|
color_eyre::install()?;
|
||||||
|
@ -19,8 +17,7 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
|
||||||
|
|
||||||
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
|
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
|
||||||
|
|
||||||
let format_layer =
|
let format_layer = tracing_subscriber::fmt::layer();
|
||||||
tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE);
|
|
||||||
|
|
||||||
match tracing.logging.format {
|
match tracing.logging.format {
|
||||||
LogFormat::Compact => with_format(format_layer.compact(), tracing),
|
LogFormat::Compact => with_format(format_layer.compact(), tracing),
|
||||||
|
|
|
@ -11,6 +11,7 @@ mod exiftool;
|
||||||
mod ffmpeg;
|
mod ffmpeg;
|
||||||
mod file;
|
mod file;
|
||||||
mod formats;
|
mod formats;
|
||||||
|
mod future;
|
||||||
mod generate;
|
mod generate;
|
||||||
mod ingest;
|
mod ingest;
|
||||||
mod init_tracing;
|
mod init_tracing;
|
||||||
|
@ -53,8 +54,8 @@ use std::{
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
use tracing::Instrument;
|
||||||
use tracing_actix_web::TracingLogger;
|
use tracing_actix_web::TracingLogger;
|
||||||
use tracing_futures::Instrument;
|
|
||||||
|
|
||||||
use self::{
|
use self::{
|
||||||
backgrounded::Backgrounded,
|
backgrounded::Backgrounded,
|
||||||
|
@ -1550,6 +1551,7 @@ async fn identifier<S: Store>(
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(repo, store))]
|
||||||
async fn healthz<S: Store>(
|
async fn healthz<S: Store>(
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
|
|
15
src/queue.rs
15
src/queue.rs
|
@ -3,14 +3,13 @@ use crate::{
|
||||||
config::Configuration,
|
config::Configuration,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
|
future::LocalBoxFuture,
|
||||||
repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId},
|
repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::Store,
|
store::Store,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
pin::Pin,
|
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
@ -179,8 +178,6 @@ pub(crate) async fn process_images<S: Store + 'static>(
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
|
||||||
|
|
||||||
async fn process_jobs<S, F>(
|
async fn process_jobs<S, F>(
|
||||||
repo: &Arc<dyn FullRepo>,
|
repo: &Arc<dyn FullRepo>,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
@ -205,6 +202,11 @@ async fn process_jobs<S, F>(
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||||
tracing::warn!("{}", format!("{e:?}"));
|
tracing::warn!("{}", format!("{e:?}"));
|
||||||
|
|
||||||
|
if e.is_disconnected() {
|
||||||
|
actix_rt::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,6 +325,11 @@ async fn process_image_jobs<S, F>(
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||||
tracing::warn!("{}", format!("{e:?}"));
|
tracing::warn!("{}", format!("{e:?}"));
|
||||||
|
|
||||||
|
if e.is_disconnected() {
|
||||||
|
actix_rt::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,8 @@ use std::sync::Arc;
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Configuration,
|
config::Configuration,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
queue::{Cleanup, LocalBoxFuture},
|
future::LocalBoxFuture,
|
||||||
|
queue::Cleanup,
|
||||||
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::Store,
|
store::Store,
|
||||||
|
|
|
@ -3,8 +3,9 @@ use crate::{
|
||||||
config::Configuration,
|
config::Configuration,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
|
future::LocalBoxFuture,
|
||||||
ingest::Session,
|
ingest::Session,
|
||||||
queue::{LocalBoxFuture, Process},
|
queue::Process,
|
||||||
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::Store,
|
store::Store,
|
||||||
|
|
10
src/repo.rs
10
src/repo.rs
|
@ -8,6 +8,7 @@ use crate::{
|
||||||
config,
|
config,
|
||||||
details::Details,
|
details::Details,
|
||||||
error_code::{ErrorCode, OwnedErrorCode},
|
error_code::{ErrorCode, OwnedErrorCode},
|
||||||
|
future::LocalBoxFuture,
|
||||||
stream::LocalBoxStream,
|
stream::LocalBoxStream,
|
||||||
};
|
};
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
|
@ -85,6 +86,13 @@ impl RepoError {
|
||||||
Self::Canceled => ErrorCode::PANIC,
|
Self::Canceled => ErrorCode::PANIC,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) const fn is_disconnected(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::PostgresError(e) => e.is_disconnected(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
|
@ -564,8 +572,6 @@ impl HashPage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalBoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
|
|
||||||
|
|
||||||
type PageFuture = LocalBoxFuture<'static, Result<HashPage, RepoError>>;
|
type PageFuture = LocalBoxFuture<'static, Result<HashPage, RepoError>>;
|
||||||
|
|
||||||
pub(crate) struct HashStream {
|
pub(crate) struct HashStream {
|
||||||
|
|
|
@ -12,6 +12,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
use deadpool::managed::Hook;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel_async::{
|
use diesel_async::{
|
||||||
pooled_connection::{
|
pooled_connection::{
|
||||||
|
@ -29,6 +30,7 @@ use uuid::Uuid;
|
||||||
use crate::{
|
use crate::{
|
||||||
details::Details,
|
details::Details,
|
||||||
error_code::{ErrorCode, OwnedErrorCode},
|
error_code::{ErrorCode, OwnedErrorCode},
|
||||||
|
future::{LocalBoxFuture, WithMetrics, WithTimeout},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
stream::LocalBoxStream,
|
stream::LocalBoxStream,
|
||||||
};
|
};
|
||||||
|
@ -108,6 +110,9 @@ pub(crate) enum PostgresError {
|
||||||
|
|
||||||
#[error("Error deserializing upload result")]
|
#[error("Error deserializing upload result")]
|
||||||
DeserializeUploadResult(#[source] serde_json::Error),
|
DeserializeUploadResult(#[source] serde_json::Error),
|
||||||
|
|
||||||
|
#[error("Timed out waiting for postgres")]
|
||||||
|
DbTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PostgresError {
|
impl PostgresError {
|
||||||
|
@ -117,11 +122,26 @@ impl PostgresError {
|
||||||
| Self::Diesel(_)
|
| Self::Diesel(_)
|
||||||
| Self::SerializeDetails(_)
|
| Self::SerializeDetails(_)
|
||||||
| Self::SerializeUploadResult(_)
|
| Self::SerializeUploadResult(_)
|
||||||
| Self::Hex(_) => ErrorCode::POSTGRES_ERROR,
|
| Self::Hex(_)
|
||||||
|
| Self::DbTimeout => ErrorCode::POSTGRES_ERROR,
|
||||||
Self::DeserializeDetails(_) => ErrorCode::EXTRACT_DETAILS,
|
Self::DeserializeDetails(_) => ErrorCode::EXTRACT_DETAILS,
|
||||||
Self::DeserializeUploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT,
|
Self::DeserializeUploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) const fn is_disconnected(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Pool(
|
||||||
|
PoolError::Closed
|
||||||
|
| PoolError::Backend(diesel_async::pooled_connection::PoolError::ConnectionError(_)),
|
||||||
|
)
|
||||||
|
| Self::Diesel(diesel::result::Error::DatabaseError(
|
||||||
|
diesel::result::DatabaseErrorKind::ClosedConnection,
|
||||||
|
_,
|
||||||
|
)) => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PostgresRepo {
|
impl PostgresRepo {
|
||||||
|
@ -140,6 +160,10 @@ impl PostgresRepo {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
let _ = handle.await;
|
let _ = handle.await;
|
||||||
|
|
||||||
|
let parallelism = std::thread::available_parallelism()
|
||||||
|
.map(|u| u.into())
|
||||||
|
.unwrap_or(1_usize);
|
||||||
|
|
||||||
let (tx, rx) = flume::bounded(10);
|
let (tx, rx) = flume::bounded(10);
|
||||||
|
|
||||||
let mut config = ManagerConfig::default();
|
let mut config = ManagerConfig::default();
|
||||||
|
@ -149,7 +173,21 @@ impl PostgresRepo {
|
||||||
postgres_url,
|
postgres_url,
|
||||||
config,
|
config,
|
||||||
);
|
);
|
||||||
|
|
||||||
let pool = Pool::builder(mgr)
|
let pool = Pool::builder(mgr)
|
||||||
|
.runtime(deadpool::Runtime::Tokio1)
|
||||||
|
.wait_timeout(Some(Duration::from_secs(1)))
|
||||||
|
.create_timeout(Some(Duration::from_secs(2)))
|
||||||
|
.recycle_timeout(Some(Duration::from_secs(2)))
|
||||||
|
.post_create(Hook::sync_fn(|_, _| {
|
||||||
|
metrics::increment_counter!("pict-rs.postgres.pool.connection.create");
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
.post_recycle(Hook::sync_fn(|_, _| {
|
||||||
|
metrics::increment_counter!("pict-rs.postgres.pool.connection.recycle");
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
.max_size(parallelism * 8)
|
||||||
.build()
|
.build()
|
||||||
.map_err(ConnectPostgresError::BuildPool)?;
|
.map_err(ConnectPostgresError::BuildPool)?;
|
||||||
|
|
||||||
|
@ -160,7 +198,7 @@ impl PostgresRepo {
|
||||||
upload_notifications: DashMap::new(),
|
upload_notifications: DashMap::new(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone()));
|
let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone(), parallelism * 8));
|
||||||
|
|
||||||
let notifications = Arc::new(handle);
|
let notifications = Arc::new(handle);
|
||||||
|
|
||||||
|
@ -195,7 +233,7 @@ impl GetConnectionMetricsGuard {
|
||||||
|
|
||||||
impl Drop for GetConnectionMetricsGuard {
|
impl Drop for GetConnectionMetricsGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
metrics::increment_counter!("pict-rs.postgres.pool.get.end", "completed" => (!self.armed).to_string());
|
metrics::increment_counter!("pict-rs.postgres.pool.get", "completed" => (!self.armed).to_string());
|
||||||
metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
|
metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -204,9 +242,12 @@ impl Inner {
|
||||||
#[tracing::instrument(level = "TRACE", skip(self))]
|
#[tracing::instrument(level = "TRACE", skip(self))]
|
||||||
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
||||||
let guard = GetConnectionMetricsGuard::guard();
|
let guard = GetConnectionMetricsGuard::guard();
|
||||||
let res = self.pool.get().await.map_err(PostgresError::Pool);
|
|
||||||
|
let obj = self.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
|
|
||||||
guard.disarm();
|
guard.disarm();
|
||||||
res
|
|
||||||
|
Ok(obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest {
|
fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest {
|
||||||
|
@ -301,14 +342,14 @@ type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> +
|
||||||
type ConfigFn =
|
type ConfigFn =
|
||||||
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
|
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
|
||||||
|
|
||||||
async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner: Arc<Inner>) {
|
async fn delegate_notifications(
|
||||||
let parallelism = std::thread::available_parallelism()
|
receiver: flume::Receiver<Notification>,
|
||||||
.map(|u| u.into())
|
inner: Arc<Inner>,
|
||||||
.unwrap_or(1_usize);
|
capacity: usize,
|
||||||
|
) {
|
||||||
let mut job_notifier_state = JobNotifierState {
|
let mut job_notifier_state = JobNotifierState {
|
||||||
inner: &inner,
|
inner: &inner,
|
||||||
capacity: parallelism * 8,
|
capacity,
|
||||||
jobs: BTreeSet::new(),
|
jobs: BTreeSet::new(),
|
||||||
jobs_ordered: VecDeque::new(),
|
jobs_ordered: VecDeque::new(),
|
||||||
};
|
};
|
||||||
|
@ -409,7 +450,10 @@ impl HashRepo for PostgresRepo {
|
||||||
let count = hashes
|
let count = hashes
|
||||||
.count()
|
.count()
|
||||||
.get_result::<i64>(&mut conn)
|
.get_result::<i64>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.count")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(count.try_into().expect("non-negative count"))
|
Ok(count.try_into().expect("non-negative count"))
|
||||||
|
@ -424,8 +468,11 @@ impl HashRepo for PostgresRepo {
|
||||||
let timestamp = hashes
|
let timestamp = hashes
|
||||||
.select(created_at)
|
.select(created_at)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.first(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.bound")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map(time::PrimitiveDateTime::assume_utc)
|
.map(time::PrimitiveDateTime::assume_utc)
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
@ -452,8 +499,11 @@ impl HashRepo for PostgresRepo {
|
||||||
.select((created_at, hash))
|
.select((created_at, hash))
|
||||||
.filter(created_at.lt(timestamp))
|
.filter(created_at.lt(timestamp))
|
||||||
.order(created_at.desc())
|
.order(created_at.desc())
|
||||||
.first::<(time::PrimitiveDateTime, Hash)>(&mut conn)
|
.get_result::<(time::PrimitiveDateTime, Hash)>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.ordered-hash")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(|tup| OrderedHash {
|
.map(|tup| OrderedHash {
|
||||||
|
@ -488,8 +538,11 @@ impl HashRepo for PostgresRepo {
|
||||||
.order(created_at.desc())
|
.order(created_at.desc())
|
||||||
.then_order_by(hash.desc())
|
.then_order_by(hash.desc())
|
||||||
.limit(limit as i64 + 1)
|
.limit(limit as i64 + 1)
|
||||||
.load::<Hash>(&mut conn)
|
.get_results::<Hash>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.next-hashes")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
let prev = hashes
|
let prev = hashes
|
||||||
|
@ -500,7 +553,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.then_order_by(hash)
|
.then_order_by(hash)
|
||||||
.limit(limit as i64)
|
.limit(limit as i64)
|
||||||
.get_results::<Hash>(&mut conn)
|
.get_results::<Hash>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.prev-hashes")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.pop();
|
.pop();
|
||||||
|
|
||||||
|
@ -511,8 +567,11 @@ impl HashRepo for PostgresRepo {
|
||||||
.order(created_at.desc())
|
.order(created_at.desc())
|
||||||
.then_order_by(hash.desc())
|
.then_order_by(hash.desc())
|
||||||
.limit(limit as i64 + 1)
|
.limit(limit as i64 + 1)
|
||||||
.load::<Hash>(&mut conn)
|
.get_results::<Hash>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.first-hashes")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
(page, None)
|
(page, None)
|
||||||
|
@ -548,7 +607,10 @@ impl HashRepo for PostgresRepo {
|
||||||
created_at.eq(×tamp),
|
created_at.eq(×tamp),
|
||||||
))
|
))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await;
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.create-hash")
|
||||||
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(_) => Ok(Ok(())),
|
Ok(_) => Ok(Ok(())),
|
||||||
|
@ -574,7 +636,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.set(identifier.eq(input_identifier.as_ref()))
|
.set(identifier.eq(input_identifier.as_ref()))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.update-identifier")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -590,7 +655,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.select(identifier)
|
.select(identifier)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.get_result::<String>(&mut conn)
|
.get_result::<String>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.identifier")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -615,7 +683,10 @@ impl HashRepo for PostgresRepo {
|
||||||
identifier.eq(input_identifier.as_ref()),
|
identifier.eq(input_identifier.as_ref()),
|
||||||
))
|
))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await;
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variants.relate-variant-identifier")
|
||||||
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(_) => Ok(Ok(())),
|
Ok(_) => Ok(Ok(())),
|
||||||
|
@ -642,7 +713,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.filter(variant.eq(&input_variant))
|
.filter(variant.eq(&input_variant))
|
||||||
.get_result::<String>(&mut conn)
|
.get_result::<String>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variants.identifier")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(Arc::from);
|
.map(Arc::from);
|
||||||
|
@ -660,7 +734,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.select((variant, identifier))
|
.select((variant, identifier))
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.get_results::<(String, String)>(&mut conn)
|
.get_results::<(String, String)>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variants.for-hash")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(s, i)| (s, Arc::from(i)))
|
.map(|(s, i)| (s, Arc::from(i)))
|
||||||
|
@ -683,7 +760,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.filter(variant.eq(&input_variant))
|
.filter(variant.eq(&input_variant))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variants.remove")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -703,7 +783,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.set(motion_identifier.eq(input_identifier.as_ref()))
|
.set(motion_identifier.eq(input_identifier.as_ref()))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.relate-motion-identifier")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -719,7 +802,10 @@ impl HashRepo for PostgresRepo {
|
||||||
.select(motion_identifier)
|
.select(motion_identifier)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.get_result::<Option<String>>(&mut conn)
|
.get_result::<Option<String>>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.motion-identifier")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.flatten()
|
.flatten()
|
||||||
|
@ -737,11 +823,13 @@ impl HashRepo for PostgresRepo {
|
||||||
diesel::delete(schema::variants::dsl::variants)
|
diesel::delete(schema::variants::dsl::variants)
|
||||||
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
|
.with_metrics("pict-rs.postgres.variants.cleanup")
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
diesel::delete(schema::hashes::dsl::hashes)
|
diesel::delete(schema::hashes::dsl::hashes)
|
||||||
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
|
.with_metrics("pict-rs.postgres.hashes.cleanup")
|
||||||
.await
|
.await
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -772,7 +860,10 @@ impl AliasRepo for PostgresRepo {
|
||||||
token.eq(delete_token),
|
token.eq(delete_token),
|
||||||
))
|
))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await;
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.aliases.create")
|
||||||
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(_) => Ok(Ok(())),
|
Ok(_) => Ok(Ok(())),
|
||||||
|
@ -794,7 +885,10 @@ impl AliasRepo for PostgresRepo {
|
||||||
.select(token)
|
.select(token)
|
||||||
.filter(alias.eq(input_alias))
|
.filter(alias.eq(input_alias))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.aliases.delete-token")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -811,7 +905,10 @@ impl AliasRepo for PostgresRepo {
|
||||||
.select(hash)
|
.select(hash)
|
||||||
.filter(alias.eq(input_alias))
|
.filter(alias.eq(input_alias))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.aliases.hash")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -828,7 +925,10 @@ impl AliasRepo for PostgresRepo {
|
||||||
.select(alias)
|
.select(alias)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
.get_results(&mut conn)
|
.get_results(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.aliases.for-hash")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
|
@ -843,7 +943,10 @@ impl AliasRepo for PostgresRepo {
|
||||||
diesel::delete(aliases)
|
diesel::delete(aliases)
|
||||||
.filter(alias.eq(input_alias))
|
.filter(alias.eq(input_alias))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.aliases.cleanup")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -866,7 +969,10 @@ impl SettingsRepo for PostgresRepo {
|
||||||
.do_update()
|
.do_update()
|
||||||
.set(value.eq(&input_value))
|
.set(value.eq(&input_value))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.settings.set")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -882,7 +988,10 @@ impl SettingsRepo for PostgresRepo {
|
||||||
.select(value)
|
.select(value)
|
||||||
.filter(key.eq(input_key))
|
.filter(key.eq(input_key))
|
||||||
.get_result::<String>(&mut conn)
|
.get_result::<String>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.settings.get")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(hex::decode)
|
.map(hex::decode)
|
||||||
|
@ -902,7 +1011,10 @@ impl SettingsRepo for PostgresRepo {
|
||||||
diesel::delete(settings)
|
diesel::delete(settings)
|
||||||
.filter(key.eq(input_key))
|
.filter(key.eq(input_key))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.settings.remove")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -927,7 +1039,10 @@ impl DetailsRepo for PostgresRepo {
|
||||||
diesel::insert_into(details)
|
diesel::insert_into(details)
|
||||||
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
|
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.details.relate")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -943,7 +1058,10 @@ impl DetailsRepo for PostgresRepo {
|
||||||
.select(json)
|
.select(json)
|
||||||
.filter(identifier.eq(input_identifier.as_ref()))
|
.filter(identifier.eq(input_identifier.as_ref()))
|
||||||
.get_result::<serde_json::Value>(&mut conn)
|
.get_result::<serde_json::Value>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.details.get")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(serde_json::from_value)
|
.map(serde_json::from_value)
|
||||||
|
@ -963,7 +1081,10 @@ impl DetailsRepo for PostgresRepo {
|
||||||
diesel::delete(details)
|
diesel::delete(details)
|
||||||
.filter(identifier.eq(input_identifier.as_ref()))
|
.filter(identifier.eq(input_identifier.as_ref()))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.details.cleanup")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -988,7 +1109,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
.values((queue.eq(queue_name), job.eq(job_json)))
|
.values((queue.eq(queue_name), job.eq(job_json)))
|
||||||
.returning(id)
|
.returning(id)
|
||||||
.get_result::<Uuid>(&mut conn)
|
.get_result::<Uuid>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.push")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
guard.disarm();
|
guard.disarm();
|
||||||
|
@ -1018,7 +1142,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
|
|
||||||
diesel::sql_query("LISTEN queue_status_channel;")
|
diesel::sql_query("LISTEN queue_status_channel;")
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.listen")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
||||||
|
@ -1030,7 +1157,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
status.eq(JobStatus::New),
|
status.eq(JobStatus::New),
|
||||||
))
|
))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.requeue")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
|
@ -1045,7 +1175,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
.order(queue_time)
|
.order(queue_time)
|
||||||
.limit(1)
|
.limit(1)
|
||||||
.get_result::<Uuid>(&mut conn)
|
.get_result::<Uuid>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.select")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -1063,7 +1196,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
))
|
))
|
||||||
.returning((id, job))
|
.returning((id, job))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.claim")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -1110,7 +1246,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
)
|
)
|
||||||
.set(heartbeat.eq(timestamp))
|
.set(heartbeat.eq(timestamp))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.heartbeat")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1134,7 +1273,10 @@ impl QueueRepo for PostgresRepo {
|
||||||
.and(worker.eq(worker_id)),
|
.and(worker.eq(worker_id)),
|
||||||
)
|
)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.queue.complete")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1152,7 +1294,10 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
let count = store_migrations
|
let count = store_migrations
|
||||||
.count()
|
.count()
|
||||||
.get_result::<i64>(&mut conn)
|
.get_result::<i64>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.store-migration.count")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(count > 0)
|
Ok(count > 0)
|
||||||
|
@ -1176,7 +1321,10 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
.on_conflict((old_identifier, new_identifier))
|
.on_conflict((old_identifier, new_identifier))
|
||||||
.do_nothing()
|
.do_nothing()
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.store-migration.mark-migrated")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1192,7 +1340,10 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())),
|
store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())),
|
||||||
))
|
))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.store-migration.is-migrated")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(b)
|
Ok(b)
|
||||||
|
@ -1206,7 +1357,10 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
|
|
||||||
diesel::delete(store_migrations)
|
diesel::delete(store_migrations)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(20))
|
||||||
|
.with_metrics("pict-rs.postgres.store-migration.clear")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1224,7 +1378,10 @@ impl ProxyRepo for PostgresRepo {
|
||||||
diesel::insert_into(proxies)
|
diesel::insert_into(proxies)
|
||||||
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.proxy.relate-url")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1240,7 +1397,10 @@ impl ProxyRepo for PostgresRepo {
|
||||||
.select(alias)
|
.select(alias)
|
||||||
.filter(url.eq(input_url.as_str()))
|
.filter(url.eq(input_url.as_str()))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.proxy.related")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -1256,7 +1416,10 @@ impl ProxyRepo for PostgresRepo {
|
||||||
diesel::delete(proxies)
|
diesel::delete(proxies)
|
||||||
.filter(alias.eq(&input_alias))
|
.filter(alias.eq(&input_alias))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.proxy.remove-relation")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1281,7 +1444,10 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
.filter(alias.eq(&input_alias))
|
.filter(alias.eq(&input_alias))
|
||||||
.set(accessed.eq(timestamp))
|
.set(accessed.eq(timestamp))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.alias-access.set-accessed")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1300,7 +1466,10 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
.select(accessed)
|
.select(accessed)
|
||||||
.filter(alias.eq(&input_alias))
|
.filter(alias.eq(&input_alias))
|
||||||
.get_result::<time::PrimitiveDateTime>(&mut conn)
|
.get_result::<time::PrimitiveDateTime>(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.alias-access.accessed-at")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(time::PrimitiveDateTime::assume_utc);
|
.map(time::PrimitiveDateTime::assume_utc);
|
||||||
|
@ -1330,7 +1499,10 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
.order(accessed.desc())
|
.order(accessed.desc())
|
||||||
.limit(100)
|
.limit(100)
|
||||||
.get_results(&mut conn)
|
.get_results(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.alias-access.older-aliases")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
|
@ -1364,7 +1536,10 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
||||||
.set(accessed.eq(timestamp))
|
.set(accessed.eq(timestamp))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variant-access.set-accessed")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1384,7 +1559,10 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
.select(accessed)
|
.select(accessed)
|
||||||
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variant-access.accessed-at")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?
|
.map_err(PostgresError::Diesel)?
|
||||||
.map(time::PrimitiveDateTime::assume_utc);
|
.map(time::PrimitiveDateTime::assume_utc);
|
||||||
|
@ -1414,7 +1592,10 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
.order(accessed.desc())
|
.order(accessed.desc())
|
||||||
.limit(100)
|
.limit(100)
|
||||||
.get_results(&mut conn)
|
.get_results(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.variant-access.older-variants")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
|
@ -1477,7 +1658,10 @@ impl UploadRepo for PostgresRepo {
|
||||||
.default_values()
|
.default_values()
|
||||||
.returning(id)
|
.returning(id)
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.uploads.create")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(UploadId { id: uuid })
|
Ok(UploadId { id: uuid })
|
||||||
|
@ -1497,14 +1681,20 @@ impl UploadRepo for PostgresRepo {
|
||||||
|
|
||||||
diesel::sql_query("LISTEN upload_completion_channel;")
|
diesel::sql_query("LISTEN upload_completion_channel;")
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.uploads.listen")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
let nested_opt = uploads
|
let nested_opt = uploads
|
||||||
.select(result)
|
.select(result)
|
||||||
.filter(id.eq(upload_id.id))
|
.filter(id.eq(upload_id.id))
|
||||||
.get_result(&mut conn)
|
.get_result(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.uploads.wait")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.optional()
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
@ -1543,7 +1733,10 @@ impl UploadRepo for PostgresRepo {
|
||||||
diesel::delete(uploads)
|
diesel::delete(uploads)
|
||||||
.filter(id.eq(upload_id.id))
|
.filter(id.eq(upload_id.id))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.uploads.claim")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1567,7 +1760,10 @@ impl UploadRepo for PostgresRepo {
|
||||||
.filter(id.eq(upload_id.id))
|
.filter(id.eq(upload_id.id))
|
||||||
.set(result.eq(upload_result))
|
.set(result.eq(upload_result))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
|
.with_metrics("pict-rs.postgres.uploads.complete")
|
||||||
.await
|
.await
|
||||||
|
.map_err(|_| PostgresError::DbTimeout)?
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1587,8 +1783,6 @@ impl FullRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalBoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
|
|
||||||
|
|
||||||
type NextFuture<I> = LocalBoxFuture<'static, Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>;
|
type NextFuture<I> = LocalBoxFuture<'static, Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>;
|
||||||
|
|
||||||
struct PageStream<I> {
|
struct PageStream<I> {
|
||||||
|
|
|
@ -39,9 +39,17 @@ impl StoreError {
|
||||||
Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND,
|
Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) const fn is_not_found(&self) -> bool {
|
pub(crate) const fn is_not_found(&self) -> bool {
|
||||||
matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_))
|
matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) const fn is_disconnected(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Repo(e) => e.is_disconnected(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<crate::store::file_store::FileError> for StoreError {
|
impl From<crate::store::file_store::FileError> for StoreError {
|
||||||
|
|
Loading…
Reference in a new issue