From 31caea438e6b5a1c55d2a33d2b0e7298702c7366 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 3 Sep 2023 20:05:29 -0500 Subject: [PATCH] Fix slow connection pool access --- dev.toml | 3 +- docker/object-storage/docker-compose.yml | 37 ++-- src/repo.rs | 218 ----------------------- src/repo/alias.rs | 153 ++++++++++++++++ src/repo/delete_token.rs | 72 ++++++++ src/repo/postgres.rs | 206 +++++++++++---------- src/repo/postgres/schema.rs | 2 +- 7 files changed, 365 insertions(+), 326 deletions(-) diff --git a/dev.toml b/dev.toml index 6cdd71b..6c72d3e 100644 --- a/dev.toml +++ b/dev.toml @@ -11,6 +11,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' buffer_capacity = 102400 [tracing.opentelemetry] +url = 'http://127.0.0.1:4317' service_name = 'pict-rs' targets = 'info' @@ -60,7 +61,7 @@ crf_max = 12 [repo] type = 'postgres' -url = 'postgres://postgres:1234@localhost:5432/postgres' +url = 'postgres://pictrs:1234@localhost:5432/pictrs' # [repo] # type = 'sled' diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index a4e5d5a..3a028a9 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -13,7 +13,7 @@ services: # - "6669:6669" # environment: # - PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669 - # - PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137 + # - PICTRS__TRACING__OPENTELEMETRY__URL=http://jaeger:4317 # - RUST_BACKTRACE=1 # stdin_open: true # tty: true @@ -27,7 +27,7 @@ services: # - "8081:8081" # environment: # - PICTRS_PROXY_UPSTREAM=http://pictrs:8080 - # - PICTRS_PROXY_OPENTELEMETRY_URL=http://otel:4137 + # - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317 minio: image: quay.io/minio/minio @@ -39,7 +39,7 @@ services: - ./storage/minio:/mnt garage: - image: dxflrs/garage:v0.8.1 + image: dxflrs/garage:v0.8.3 ports: - "3900:3900" - "3901:3901" @@ -47,26 +47,35 @@ services: - "3903:3903" - "3904:3904" environment: - - RUST_LOG=debug + - RUST_LOG=info volumes: - ./storage/garage:/mnt - ./garage.toml:/etc/garage.toml - otel: - image: otel/opentelemetry-collector:latest - command: --config otel-local-config.yaml + postgres: + image: postgres:15-alpine + ports: + - "5432:5432" + environment: + - PGDATA=/var/lib/postgresql/data + - POSTGRES_DB=pictrs + - POSTGRES_USER=pictrs + - POSTGRES_PASSWORD=1234 volumes: - - type: bind - source: ./otel.yml - target: /otel-local-config.yaml - restart: always - depends_on: - - jaeger + - ./storage/postgres:/var/lib/postgresql/data jaeger: - image: jaegertracing/all-in-one:1 + image: jaegertracing/all-in-one:1.48 ports: + - "6831:6831/udp" + - "6832:6832/udp" + - "5778:5778" + - "4317:4317" + - "4138:4138" - "14250:14250" + - "14268:14268" + - "14269:14269" + - "9411:9411" # To view traces, visit http://localhost:16686 - "16686:16686" restart: always diff --git a/src/repo.rs b/src/repo.rs index fdf5ce1..0c8c312 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -931,221 +931,3 @@ impl std::fmt::Display for MaybeUuid { } } } - -#[cfg(test)] -mod tests { - use super::{Alias, DeleteToken, MaybeUuid, Uuid}; - - #[test] - fn string_delete_token() { - let delete_token = DeleteToken::from_existing("blah"); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Name(String::from("blah")) - } - ) - } - - #[test] - fn uuid_string_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_existing(&uuid.to_string()); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn bytes_delete_token() { - let delete_token = DeleteToken::from_slice(b"blah").unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Name(String::from("blah")) - } - ) - } - - #[test] - fn uuid_bytes_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn uuid_bytes_string_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn string_alias() { - let alias = Alias::from_existing("blah"); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: None - } - ); - } - - #[test] - fn string_alias_ext() { - let alias = Alias::from_existing("blah.mp4"); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: Some(String::from(".mp4")), - } - ); - } - - #[test] - fn uuid_string_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_existing(&uuid.to_string()); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_string_alias_ext() { - let uuid = Uuid::new_v4(); - - let alias_str = format!("{uuid}.mp4"); - let alias = Alias::from_existing(&alias_str); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } - - #[test] - fn bytes_alias() { - let alias = Alias::from_slice(b"blah").unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: None - } - ); - } - - #[test] - fn bytes_alias_ext() { - let alias = Alias::from_slice(b"blah.mp4").unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: Some(String::from(".mp4")), - } - ); - } - - #[test] - fn uuid_bytes_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_bytes_string_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_bytes_alias_ext() { - let uuid = Uuid::new_v4(); - - let mut alias_bytes = uuid.as_bytes().to_vec(); - alias_bytes.extend_from_slice(b".mp4"); - - let alias = Alias::from_slice(&alias_bytes).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } - - #[test] - fn uuid_bytes_string_alias_ext() { - let uuid = Uuid::new_v4(); - - let alias_str = format!("{uuid}.mp4"); - let alias = Alias::from_slice(alias_str.as_bytes()).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } -} diff --git a/src/repo/alias.rs b/src/repo/alias.rs index 48156e1..a0d021c 100644 --- a/src/repo/alias.rs +++ b/src/repo/alias.rs @@ -119,3 +119,156 @@ impl std::fmt::Display for Alias { } } } + +#[cfg(test)] +mod tests { + use super::{Alias, MaybeUuid}; + use uuid::Uuid; + + #[test] + fn string_alias() { + let alias = Alias::from_existing("blah"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn string_alias_ext() { + let alias = Alias::from_existing("blah.mp4"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_existing(&uuid.to_string()); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{uuid}.mp4"); + let alias = Alias::from_existing(&alias_str); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn bytes_alias() { + let alias = Alias::from_slice(b"blah").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn bytes_alias_ext() { + let alias = Alias::from_slice(b"blah.mp4").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_bytes_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_alias_ext() { + let uuid = Uuid::new_v4(); + + let mut alias_bytes = uuid.as_bytes().to_vec(); + alias_bytes.extend_from_slice(b".mp4"); + + let alias = Alias::from_slice(&alias_bytes).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn uuid_bytes_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{uuid}.mp4"); + let alias = Alias::from_slice(alias_str.as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } +} diff --git a/src/repo/delete_token.rs b/src/repo/delete_token.rs index 7c06b09..ca1a50f 100644 --- a/src/repo/delete_token.rs +++ b/src/repo/delete_token.rs @@ -86,3 +86,75 @@ impl std::fmt::Display for DeleteToken { write!(f, "{}", self.id) } } + +#[cfg(test)] +mod tests { + use super::{DeleteToken, MaybeUuid}; + use uuid::Uuid; + + #[test] + fn string_delete_token() { + let delete_token = DeleteToken::from_existing("blah"); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) + } + + #[test] + fn uuid_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_existing(&uuid.to_string()); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn bytes_delete_token() { + let delete_token = DeleteToken::from_slice(b"blah").unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) + } + + #[test] + fn uuid_bytes_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn uuid_bytes_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } +} diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index ab08fef..f63331d 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -14,7 +14,7 @@ use dashmap::DashMap; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ - deadpool::{BuildError, Pool, PoolError}, + deadpool::{BuildError, Object, Pool, PoolError}, AsyncDieselConnectionManager, ManagerConfig, }, AsyncConnection, AsyncPgConnection, RunQueryDsl, @@ -176,6 +176,17 @@ impl PostgresRepo { notifications, }) } + + async fn get_connection(&self) -> Result, PostgresError> { + self.inner.get_connection().await + } +} + +impl Inner { + #[tracing::instrument(level = "DEBUG", skip(self))] + async fn get_connection(&self) -> Result, PostgresError> { + self.pool.get().await.map_err(PostgresError::Pool) + } } type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; @@ -235,7 +246,7 @@ impl HashRepo for PostgresRepo { async fn size(&self) -> Result { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let count = hashes .count() @@ -250,7 +261,7 @@ impl HashRepo for PostgresRepo { async fn bound(&self, input_hash: Hash) -> Result, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = hashes .select(created_at) @@ -275,7 +286,7 @@ impl HashRepo for PostgresRepo { ) -> Result { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = to_primitive(date); @@ -303,7 +314,7 @@ impl HashRepo for PostgresRepo { ) -> Result { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let (mut page, prev) = if let Some(OrderedHash { timestamp, @@ -368,7 +379,7 @@ impl HashRepo for PostgresRepo { ) -> Result, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = to_primitive(timestamp); @@ -399,7 +410,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::update(hashes) .filter(hash.eq(&input_hash)) @@ -415,7 +426,7 @@ impl HashRepo for PostgresRepo { async fn identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = hashes .select(identifier) @@ -437,7 +448,7 @@ impl HashRepo for PostgresRepo { ) -> Result, RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let res = diesel::insert_into(variants) .values(( @@ -466,7 +477,7 @@ impl HashRepo for PostgresRepo { ) -> Result>, RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = variants .select(identifier) @@ -485,7 +496,7 @@ impl HashRepo for PostgresRepo { async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let vec = variants .select((variant, identifier)) @@ -508,7 +519,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(variants) .filter(hash.eq(&input_hash)) @@ -528,7 +539,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::update(hashes) .filter(hash.eq(&input_hash)) @@ -544,7 +555,7 @@ impl HashRepo for PostgresRepo { async fn motion_identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = hashes .select(motion_identifier) @@ -561,7 +572,7 @@ impl HashRepo for PostgresRepo { #[tracing::instrument(level = "DEBUG", skip(self))] async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> { - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; conn.transaction(|conn| { Box::pin(async move { @@ -594,7 +605,7 @@ impl AliasRepo for PostgresRepo { ) -> Result, RepoError> { use schema::aliases::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let res = diesel::insert_into(aliases) .values(( @@ -619,7 +630,7 @@ impl AliasRepo for PostgresRepo { async fn delete_token(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = aliases .select(token) @@ -636,7 +647,7 @@ impl AliasRepo for PostgresRepo { async fn hash(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = aliases .select(hash) @@ -653,7 +664,7 @@ impl AliasRepo for PostgresRepo { async fn aliases_for_hash(&self, input_hash: Hash) -> Result, RepoError> { use schema::aliases::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let vec = aliases .select(alias) @@ -669,7 +680,7 @@ impl AliasRepo for PostgresRepo { async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> { use schema::aliases::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(aliases) .filter(alias.eq(input_alias)) @@ -689,7 +700,7 @@ impl SettingsRepo for PostgresRepo { let input_value = hex::encode(input_value); - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::insert_into(settings) .values((key.eq(input_key), value.eq(&input_value))) @@ -707,7 +718,7 @@ impl SettingsRepo for PostgresRepo { async fn get(&self, input_key: &'static str) -> Result>, RepoError> { use schema::settings::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = settings .select(value) @@ -728,7 +739,7 @@ impl SettingsRepo for PostgresRepo { async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> { use schema::settings::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(settings) .filter(key.eq(input_key)) @@ -750,7 +761,7 @@ impl DetailsRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::details::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let value = serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; @@ -768,7 +779,7 @@ impl DetailsRepo for PostgresRepo { async fn details(&self, input_identifier: &Arc) -> Result, RepoError> { use schema::details::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = details .select(json) @@ -789,7 +800,7 @@ impl DetailsRepo for PostgresRepo { async fn cleanup_details(&self, input_identifier: &Arc) -> Result<(), RepoError> { use schema::details::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(details) .filter(identifier.eq(input_identifier.as_ref())) @@ -811,7 +822,7 @@ impl QueueRepo for PostgresRepo { ) -> Result { use schema::job_queue::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let job_id = diesel::insert_into(job_queue) .values((queue.eq(queue_name), job.eq(job_json))) @@ -832,7 +843,7 @@ impl QueueRepo for PostgresRepo { use schema::job_queue::dsl::*; loop { - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let notifier: Arc = self .inner @@ -848,7 +859,7 @@ impl QueueRepo for PostgresRepo { let timestamp = to_primitive(time::OffsetDateTime::now_utc()); - diesel::update(job_queue) + let count = diesel::update(job_queue) .filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2)))) .set(( heartbeat.eq(Option::::None), @@ -858,44 +869,58 @@ impl QueueRepo for PostgresRepo { .await .map_err(PostgresError::Diesel)?; - // TODO: for_update().skip_locked() - let id_query = job_queue - .select(id) - .filter(status.eq(JobStatus::New).and(queue.eq(queue_name))) - .order(queue_time) - .into_boxed() - .single_value(); + if count > 0 { + tracing::info!("Reset {count} jobs"); + } - let opt = diesel::update(job_queue) - .filter(id.nullable().eq(id_query)) - .set(( - heartbeat.eq(timestamp), - status.eq(JobStatus::Running), - worker.eq(worker_id), - )) - .returning((id, job)) - .get_result(&mut conn) - .await - .optional() - .map_err(PostgresError::Diesel)?; - - if let Some((job_id, job_json)) = opt { - diesel::sql_query("UNLISTEN queue_status_channel;") - .execute(&mut conn) + // TODO: combine into 1 query + let opt = loop { + let id_opt = job_queue + .select(id) + .filter(status.eq(JobStatus::New).and(queue.eq(queue_name))) + .order(queue_time) + .limit(1) + .get_result::(&mut conn) .await + .optional() .map_err(PostgresError::Diesel)?; + let Some(id_val) = id_opt else { + break None; + }; + + let opt = diesel::update(job_queue) + .filter(id.eq(id_val)) + .filter(status.eq(JobStatus::New)) + .set(( + heartbeat.eq(timestamp), + status.eq(JobStatus::Running), + worker.eq(worker_id), + )) + .returning((id, job)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + if let Some(tup) = opt { + break Some(tup); + } + }; + + if let Some((job_id, job_json)) = opt { return Ok((JobId(job_id), job_json)); } - let _ = actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()).await; - - diesel::sql_query("UNLISTEN queue_status_channel;") - .execute(&mut conn) - .await - .map_err(PostgresError::Diesel)?; - drop(conn); + if actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()) + .await + .is_ok() + { + tracing::debug!("Notified"); + } else { + tracing::debug!("Timed out"); + } } } @@ -908,7 +933,7 @@ impl QueueRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::job_queue::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = to_primitive(time::OffsetDateTime::now_utc()); @@ -935,7 +960,7 @@ impl QueueRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::job_queue::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(job_queue) .filter( @@ -957,7 +982,7 @@ impl StoreMigrationRepo for PostgresRepo { async fn is_continuing_migration(&self) -> Result { use schema::store_migrations::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let count = store_migrations .count() @@ -976,7 +1001,7 @@ impl StoreMigrationRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::store_migrations::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::insert_into(store_migrations) .values(( @@ -996,7 +1021,7 @@ impl StoreMigrationRepo for PostgresRepo { async fn is_migrated(&self, input_old_identifier: &Arc) -> Result { use schema::store_migrations::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let b = diesel::select(diesel::dsl::exists( store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())), @@ -1012,7 +1037,7 @@ impl StoreMigrationRepo for PostgresRepo { async fn clear(&self) -> Result<(), RepoError> { use schema::store_migrations::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(store_migrations) .execute(&mut conn) @@ -1029,7 +1054,7 @@ impl ProxyRepo for PostgresRepo { async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> { use schema::proxies::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::insert_into(proxies) .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) @@ -1044,7 +1069,7 @@ impl ProxyRepo for PostgresRepo { async fn related(&self, input_url: Url) -> Result, RepoError> { use schema::proxies::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = proxies .select(alias) @@ -1061,7 +1086,7 @@ impl ProxyRepo for PostgresRepo { async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> { use schema::proxies::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(proxies) .filter(alias.eq(&input_alias)) @@ -1083,7 +1108,7 @@ impl AliasAccessRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::proxies::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = to_primitive(timestamp); @@ -1104,7 +1129,7 @@ impl AliasAccessRepo for PostgresRepo { ) -> Result, RepoError> { use schema::proxies::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = proxies .select(accessed) @@ -1132,7 +1157,7 @@ impl AliasAccessRepo for PostgresRepo { Box::pin(async move { use schema::proxies::dsl::*; - let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = inner.get_connection().await?; let vec = proxies .select((accessed, alias)) @@ -1166,7 +1191,7 @@ impl VariantAccessRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let timestamp = to_primitive(input_accessed); @@ -1188,7 +1213,7 @@ impl VariantAccessRepo for PostgresRepo { ) -> Result, RepoError> { use schema::variants::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let opt = variants .select(accessed) @@ -1216,7 +1241,7 @@ impl VariantAccessRepo for PostgresRepo { Box::pin(async move { use schema::variants::dsl::*; - let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = inner.get_connection().await?; let vec = variants .select((accessed, (hash, variant))) @@ -1281,7 +1306,7 @@ impl UploadRepo for PostgresRepo { async fn create_upload(&self) -> Result { use schema::uploads::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let uuid = diesel::insert_into(uploads) .default_values() @@ -1298,7 +1323,7 @@ impl UploadRepo for PostgresRepo { use schema::uploads::dsl::*; loop { - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::sql_query("LISTEN upload_completion_channel;") .execute(&mut conn) @@ -1315,28 +1340,25 @@ impl UploadRepo for PostgresRepo { .flatten(); if let Some(upload_result) = opt { - diesel::sql_query("UNLISTEN upload_completion_channel;") - .execute(&mut conn) - .await - .map_err(PostgresError::Diesel)?; - let upload_result: InnerUploadResult = serde_json::from_value(upload_result) .map_err(PostgresError::DeserializeUploadResult)?; return Ok(upload_result.into()); } - let _ = actix_rt::time::timeout( - Duration::from_secs(2), + drop(conn); + + if actix_rt::time::timeout( + Duration::from_secs(5), self.inner.upload_notifier.notified(), ) - .await; - - diesel::sql_query("UNLISTEN upload_completion_channel;") - .execute(&mut conn) - .await - .map_err(PostgresError::Diesel)?; - drop(conn); + .await + .is_ok() + { + tracing::debug!("Notified"); + } else { + tracing::debug!("Timed out"); + } } } @@ -1344,7 +1366,7 @@ impl UploadRepo for PostgresRepo { async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { use schema::uploads::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; diesel::delete(uploads) .filter(id.eq(upload_id.id)) @@ -1363,7 +1385,7 @@ impl UploadRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::uploads::dsl::*; - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.get_connection().await?; let upload_result: InnerUploadResult = upload_result.into(); let upload_result = diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index 964937b..72ba9aa 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -1,7 +1,7 @@ // @generated automatically by Diesel CLI. pub mod sql_types { - #[derive(diesel::sql_types::SqlType)] + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "job_status"))] pub struct JobStatus; }