mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Fix slow connection pool access
This commit is contained in:
parent
8a892ba622
commit
31caea438e
7 changed files with 365 additions and 326 deletions
3
dev.toml
3
dev.toml
|
@ -11,6 +11,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'
|
||||||
buffer_capacity = 102400
|
buffer_capacity = 102400
|
||||||
|
|
||||||
[tracing.opentelemetry]
|
[tracing.opentelemetry]
|
||||||
|
url = 'http://127.0.0.1:4317'
|
||||||
service_name = 'pict-rs'
|
service_name = 'pict-rs'
|
||||||
targets = 'info'
|
targets = 'info'
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ crf_max = 12
|
||||||
|
|
||||||
[repo]
|
[repo]
|
||||||
type = 'postgres'
|
type = 'postgres'
|
||||||
url = 'postgres://postgres:1234@localhost:5432/postgres'
|
url = 'postgres://pictrs:1234@localhost:5432/pictrs'
|
||||||
|
|
||||||
# [repo]
|
# [repo]
|
||||||
# type = 'sled'
|
# type = 'sled'
|
||||||
|
|
|
@ -13,7 +13,7 @@ services:
|
||||||
# - "6669:6669"
|
# - "6669:6669"
|
||||||
# environment:
|
# environment:
|
||||||
# - PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669
|
# - PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669
|
||||||
# - PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137
|
# - PICTRS__TRACING__OPENTELEMETRY__URL=http://jaeger:4317
|
||||||
# - RUST_BACKTRACE=1
|
# - RUST_BACKTRACE=1
|
||||||
# stdin_open: true
|
# stdin_open: true
|
||||||
# tty: true
|
# tty: true
|
||||||
|
@ -27,7 +27,7 @@ services:
|
||||||
# - "8081:8081"
|
# - "8081:8081"
|
||||||
# environment:
|
# environment:
|
||||||
# - PICTRS_PROXY_UPSTREAM=http://pictrs:8080
|
# - PICTRS_PROXY_UPSTREAM=http://pictrs:8080
|
||||||
# - PICTRS_PROXY_OPENTELEMETRY_URL=http://otel:4137
|
# - PICTRS_PROXY_OPENTELEMETRY_URL=http://jaeger:4317
|
||||||
|
|
||||||
minio:
|
minio:
|
||||||
image: quay.io/minio/minio
|
image: quay.io/minio/minio
|
||||||
|
@ -39,7 +39,7 @@ services:
|
||||||
- ./storage/minio:/mnt
|
- ./storage/minio:/mnt
|
||||||
|
|
||||||
garage:
|
garage:
|
||||||
image: dxflrs/garage:v0.8.1
|
image: dxflrs/garage:v0.8.3
|
||||||
ports:
|
ports:
|
||||||
- "3900:3900"
|
- "3900:3900"
|
||||||
- "3901:3901"
|
- "3901:3901"
|
||||||
|
@ -47,26 +47,35 @@ services:
|
||||||
- "3903:3903"
|
- "3903:3903"
|
||||||
- "3904:3904"
|
- "3904:3904"
|
||||||
environment:
|
environment:
|
||||||
- RUST_LOG=debug
|
- RUST_LOG=info
|
||||||
volumes:
|
volumes:
|
||||||
- ./storage/garage:/mnt
|
- ./storage/garage:/mnt
|
||||||
- ./garage.toml:/etc/garage.toml
|
- ./garage.toml:/etc/garage.toml
|
||||||
|
|
||||||
otel:
|
postgres:
|
||||||
image: otel/opentelemetry-collector:latest
|
image: postgres:15-alpine
|
||||||
command: --config otel-local-config.yaml
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
environment:
|
||||||
|
- PGDATA=/var/lib/postgresql/data
|
||||||
|
- POSTGRES_DB=pictrs
|
||||||
|
- POSTGRES_USER=pictrs
|
||||||
|
- POSTGRES_PASSWORD=1234
|
||||||
volumes:
|
volumes:
|
||||||
- type: bind
|
- ./storage/postgres:/var/lib/postgresql/data
|
||||||
source: ./otel.yml
|
|
||||||
target: /otel-local-config.yaml
|
|
||||||
restart: always
|
|
||||||
depends_on:
|
|
||||||
- jaeger
|
|
||||||
|
|
||||||
jaeger:
|
jaeger:
|
||||||
image: jaegertracing/all-in-one:1
|
image: jaegertracing/all-in-one:1.48
|
||||||
ports:
|
ports:
|
||||||
|
- "6831:6831/udp"
|
||||||
|
- "6832:6832/udp"
|
||||||
|
- "5778:5778"
|
||||||
|
- "4317:4317"
|
||||||
|
- "4138:4138"
|
||||||
- "14250:14250"
|
- "14250:14250"
|
||||||
|
- "14268:14268"
|
||||||
|
- "14269:14269"
|
||||||
|
- "9411:9411"
|
||||||
# To view traces, visit http://localhost:16686
|
# To view traces, visit http://localhost:16686
|
||||||
- "16686:16686"
|
- "16686:16686"
|
||||||
restart: always
|
restart: always
|
||||||
|
|
218
src/repo.rs
218
src/repo.rs
|
@ -931,221 +931,3 @@ impl std::fmt::Display for MaybeUuid {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::{Alias, DeleteToken, MaybeUuid, Uuid};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn string_delete_token() {
|
|
||||||
let delete_token = DeleteToken::from_existing("blah");
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
delete_token,
|
|
||||||
DeleteToken {
|
|
||||||
id: MaybeUuid::Name(String::from("blah"))
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_string_delete_token() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let delete_token = DeleteToken::from_existing(&uuid.to_string());
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
delete_token,
|
|
||||||
DeleteToken {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bytes_delete_token() {
|
|
||||||
let delete_token = DeleteToken::from_slice(b"blah").unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
delete_token,
|
|
||||||
DeleteToken {
|
|
||||||
id: MaybeUuid::Name(String::from("blah"))
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_delete_token() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
delete_token,
|
|
||||||
DeleteToken {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_string_delete_token() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
delete_token,
|
|
||||||
DeleteToken {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn string_alias() {
|
|
||||||
let alias = Alias::from_existing("blah");
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Name(String::from("blah")),
|
|
||||||
extension: None
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn string_alias_ext() {
|
|
||||||
let alias = Alias::from_existing("blah.mp4");
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Name(String::from("blah")),
|
|
||||||
extension: Some(String::from(".mp4")),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_string_alias() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let alias = Alias::from_existing(&uuid.to_string());
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: None,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_string_alias_ext() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let alias_str = format!("{uuid}.mp4");
|
|
||||||
let alias = Alias::from_existing(&alias_str);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: Some(String::from(".mp4")),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bytes_alias() {
|
|
||||||
let alias = Alias::from_slice(b"blah").unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Name(String::from("blah")),
|
|
||||||
extension: None
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bytes_alias_ext() {
|
|
||||||
let alias = Alias::from_slice(b"blah.mp4").unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Name(String::from("blah")),
|
|
||||||
extension: Some(String::from(".mp4")),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_alias() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: None,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_string_alias() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: None,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_alias_ext() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let mut alias_bytes = uuid.as_bytes().to_vec();
|
|
||||||
alias_bytes.extend_from_slice(b".mp4");
|
|
||||||
|
|
||||||
let alias = Alias::from_slice(&alias_bytes).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: Some(String::from(".mp4")),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn uuid_bytes_string_alias_ext() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let alias_str = format!("{uuid}.mp4");
|
|
||||||
let alias = Alias::from_slice(alias_str.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
alias,
|
|
||||||
Alias {
|
|
||||||
id: MaybeUuid::Uuid(uuid),
|
|
||||||
extension: Some(String::from(".mp4")),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -119,3 +119,156 @@ impl std::fmt::Display for Alias {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{Alias, MaybeUuid};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn string_alias() {
|
||||||
|
let alias = Alias::from_existing("blah");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Name(String::from("blah")),
|
||||||
|
extension: None
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn string_alias_ext() {
|
||||||
|
let alias = Alias::from_existing("blah.mp4");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Name(String::from("blah")),
|
||||||
|
extension: Some(String::from(".mp4")),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_string_alias() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let alias = Alias::from_existing(&uuid.to_string());
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_string_alias_ext() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let alias_str = format!("{uuid}.mp4");
|
||||||
|
let alias = Alias::from_existing(&alias_str);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: Some(String::from(".mp4")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bytes_alias() {
|
||||||
|
let alias = Alias::from_slice(b"blah").unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Name(String::from("blah")),
|
||||||
|
extension: None
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bytes_alias_ext() {
|
||||||
|
let alias = Alias::from_slice(b"blah.mp4").unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Name(String::from("blah")),
|
||||||
|
extension: Some(String::from(".mp4")),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_alias() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_string_alias() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_alias_ext() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let mut alias_bytes = uuid.as_bytes().to_vec();
|
||||||
|
alias_bytes.extend_from_slice(b".mp4");
|
||||||
|
|
||||||
|
let alias = Alias::from_slice(&alias_bytes).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: Some(String::from(".mp4")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_string_alias_ext() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let alias_str = format!("{uuid}.mp4");
|
||||||
|
let alias = Alias::from_slice(alias_str.as_bytes()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
alias,
|
||||||
|
Alias {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
extension: Some(String::from(".mp4")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -86,3 +86,75 @@ impl std::fmt::Display for DeleteToken {
|
||||||
write!(f, "{}", self.id)
|
write!(f, "{}", self.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{DeleteToken, MaybeUuid};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn string_delete_token() {
|
||||||
|
let delete_token = DeleteToken::from_existing("blah");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
delete_token,
|
||||||
|
DeleteToken {
|
||||||
|
id: MaybeUuid::Name(String::from("blah"))
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_string_delete_token() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let delete_token = DeleteToken::from_existing(&uuid.to_string());
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
delete_token,
|
||||||
|
DeleteToken {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bytes_delete_token() {
|
||||||
|
let delete_token = DeleteToken::from_slice(b"blah").unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
delete_token,
|
||||||
|
DeleteToken {
|
||||||
|
id: MaybeUuid::Name(String::from("blah"))
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_delete_token() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
delete_token,
|
||||||
|
DeleteToken {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uuid_bytes_string_delete_token() {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
delete_token,
|
||||||
|
DeleteToken {
|
||||||
|
id: MaybeUuid::Uuid(uuid),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ use dashmap::DashMap;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel_async::{
|
use diesel_async::{
|
||||||
pooled_connection::{
|
pooled_connection::{
|
||||||
deadpool::{BuildError, Pool, PoolError},
|
deadpool::{BuildError, Object, Pool, PoolError},
|
||||||
AsyncDieselConnectionManager, ManagerConfig,
|
AsyncDieselConnectionManager, ManagerConfig,
|
||||||
},
|
},
|
||||||
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
||||||
|
@ -176,6 +176,17 @@ impl PostgresRepo {
|
||||||
notifications,
|
notifications,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
||||||
|
self.inner.get_connection().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inner {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
|
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
|
||||||
|
self.pool.get().await.map_err(PostgresError::Pool)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
|
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
|
||||||
|
@ -235,7 +246,7 @@ impl HashRepo for PostgresRepo {
|
||||||
async fn size(&self) -> Result<u64, RepoError> {
|
async fn size(&self) -> Result<u64, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let count = hashes
|
let count = hashes
|
||||||
.count()
|
.count()
|
||||||
|
@ -250,7 +261,7 @@ impl HashRepo for PostgresRepo {
|
||||||
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = hashes
|
let timestamp = hashes
|
||||||
.select(created_at)
|
.select(created_at)
|
||||||
|
@ -275,7 +286,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<HashPage, RepoError> {
|
) -> Result<HashPage, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = to_primitive(date);
|
let timestamp = to_primitive(date);
|
||||||
|
|
||||||
|
@ -303,7 +314,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<HashPage, RepoError> {
|
) -> Result<HashPage, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let (mut page, prev) = if let Some(OrderedHash {
|
let (mut page, prev) = if let Some(OrderedHash {
|
||||||
timestamp,
|
timestamp,
|
||||||
|
@ -368,7 +379,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<Result<(), HashAlreadyExists>, RepoError> {
|
) -> Result<Result<(), HashAlreadyExists>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = to_primitive(timestamp);
|
let timestamp = to_primitive(timestamp);
|
||||||
|
|
||||||
|
@ -399,7 +410,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::update(hashes)
|
diesel::update(hashes)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
|
@ -415,7 +426,7 @@ impl HashRepo for PostgresRepo {
|
||||||
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = hashes
|
let opt = hashes
|
||||||
.select(identifier)
|
.select(identifier)
|
||||||
|
@ -437,7 +448,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let res = diesel::insert_into(variants)
|
let res = diesel::insert_into(variants)
|
||||||
.values((
|
.values((
|
||||||
|
@ -466,7 +477,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<Option<Arc<str>>, RepoError> {
|
) -> Result<Option<Arc<str>>, RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = variants
|
let opt = variants
|
||||||
.select(identifier)
|
.select(identifier)
|
||||||
|
@ -485,7 +496,7 @@ impl HashRepo for PostgresRepo {
|
||||||
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let vec = variants
|
let vec = variants
|
||||||
.select((variant, identifier))
|
.select((variant, identifier))
|
||||||
|
@ -508,7 +519,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(variants)
|
diesel::delete(variants)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
|
@ -528,7 +539,7 @@ impl HashRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::update(hashes)
|
diesel::update(hashes)
|
||||||
.filter(hash.eq(&input_hash))
|
.filter(hash.eq(&input_hash))
|
||||||
|
@ -544,7 +555,7 @@ impl HashRepo for PostgresRepo {
|
||||||
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = hashes
|
let opt = hashes
|
||||||
.select(motion_identifier)
|
.select(motion_identifier)
|
||||||
|
@ -561,7 +572,7 @@ impl HashRepo for PostgresRepo {
|
||||||
|
|
||||||
#[tracing::instrument(level = "DEBUG", skip(self))]
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
conn.transaction(|conn| {
|
conn.transaction(|conn| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -594,7 +605,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
|
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let res = diesel::insert_into(aliases)
|
let res = diesel::insert_into(aliases)
|
||||||
.values((
|
.values((
|
||||||
|
@ -619,7 +630,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
async fn delete_token(&self, input_alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
async fn delete_token(&self, input_alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = aliases
|
let opt = aliases
|
||||||
.select(token)
|
.select(token)
|
||||||
|
@ -636,7 +647,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
async fn hash(&self, input_alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
async fn hash(&self, input_alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = aliases
|
let opt = aliases
|
||||||
.select(hash)
|
.select(hash)
|
||||||
|
@ -653,7 +664,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
async fn aliases_for_hash(&self, input_hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
async fn aliases_for_hash(&self, input_hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let vec = aliases
|
let vec = aliases
|
||||||
.select(alias)
|
.select(alias)
|
||||||
|
@ -669,7 +680,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> {
|
async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(aliases)
|
diesel::delete(aliases)
|
||||||
.filter(alias.eq(input_alias))
|
.filter(alias.eq(input_alias))
|
||||||
|
@ -689,7 +700,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
|
|
||||||
let input_value = hex::encode(input_value);
|
let input_value = hex::encode(input_value);
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::insert_into(settings)
|
diesel::insert_into(settings)
|
||||||
.values((key.eq(input_key), value.eq(&input_value)))
|
.values((key.eq(input_key), value.eq(&input_value)))
|
||||||
|
@ -707,7 +718,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
async fn get(&self, input_key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
async fn get(&self, input_key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
||||||
use schema::settings::dsl::*;
|
use schema::settings::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = settings
|
let opt = settings
|
||||||
.select(value)
|
.select(value)
|
||||||
|
@ -728,7 +739,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
||||||
use schema::settings::dsl::*;
|
use schema::settings::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(settings)
|
diesel::delete(settings)
|
||||||
.filter(key.eq(input_key))
|
.filter(key.eq(input_key))
|
||||||
|
@ -750,7 +761,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::details::dsl::*;
|
use schema::details::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let value =
|
let value =
|
||||||
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
|
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
|
||||||
|
@ -768,7 +779,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
async fn details(&self, input_identifier: &Arc<str>) -> Result<Option<Details>, RepoError> {
|
async fn details(&self, input_identifier: &Arc<str>) -> Result<Option<Details>, RepoError> {
|
||||||
use schema::details::dsl::*;
|
use schema::details::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = details
|
let opt = details
|
||||||
.select(json)
|
.select(json)
|
||||||
|
@ -789,7 +800,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
async fn cleanup_details(&self, input_identifier: &Arc<str>) -> Result<(), RepoError> {
|
async fn cleanup_details(&self, input_identifier: &Arc<str>) -> Result<(), RepoError> {
|
||||||
use schema::details::dsl::*;
|
use schema::details::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(details)
|
diesel::delete(details)
|
||||||
.filter(identifier.eq(input_identifier.as_ref()))
|
.filter(identifier.eq(input_identifier.as_ref()))
|
||||||
|
@ -811,7 +822,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
) -> Result<JobId, RepoError> {
|
) -> Result<JobId, RepoError> {
|
||||||
use schema::job_queue::dsl::*;
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let job_id = diesel::insert_into(job_queue)
|
let job_id = diesel::insert_into(job_queue)
|
||||||
.values((queue.eq(queue_name), job.eq(job_json)))
|
.values((queue.eq(queue_name), job.eq(job_json)))
|
||||||
|
@ -832,7 +843,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
use schema::job_queue::dsl::*;
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let notifier: Arc<Notify> = self
|
let notifier: Arc<Notify> = self
|
||||||
.inner
|
.inner
|
||||||
|
@ -848,7 +859,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
|
|
||||||
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
||||||
|
|
||||||
diesel::update(job_queue)
|
let count = diesel::update(job_queue)
|
||||||
.filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2))))
|
.filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2))))
|
||||||
.set((
|
.set((
|
||||||
heartbeat.eq(Option::<time::PrimitiveDateTime>::None),
|
heartbeat.eq(Option::<time::PrimitiveDateTime>::None),
|
||||||
|
@ -858,44 +869,58 @@ impl QueueRepo for PostgresRepo {
|
||||||
.await
|
.await
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
// TODO: for_update().skip_locked()
|
if count > 0 {
|
||||||
let id_query = job_queue
|
tracing::info!("Reset {count} jobs");
|
||||||
.select(id)
|
}
|
||||||
.filter(status.eq(JobStatus::New).and(queue.eq(queue_name)))
|
|
||||||
.order(queue_time)
|
|
||||||
.into_boxed()
|
|
||||||
.single_value();
|
|
||||||
|
|
||||||
let opt = diesel::update(job_queue)
|
// TODO: combine into 1 query
|
||||||
.filter(id.nullable().eq(id_query))
|
let opt = loop {
|
||||||
.set((
|
let id_opt = job_queue
|
||||||
heartbeat.eq(timestamp),
|
.select(id)
|
||||||
status.eq(JobStatus::Running),
|
.filter(status.eq(JobStatus::New).and(queue.eq(queue_name)))
|
||||||
worker.eq(worker_id),
|
.order(queue_time)
|
||||||
))
|
.limit(1)
|
||||||
.returning((id, job))
|
.get_result::<Uuid>(&mut conn)
|
||||||
.get_result(&mut conn)
|
|
||||||
.await
|
|
||||||
.optional()
|
|
||||||
.map_err(PostgresError::Diesel)?;
|
|
||||||
|
|
||||||
if let Some((job_id, job_json)) = opt {
|
|
||||||
diesel::sql_query("UNLISTEN queue_status_channel;")
|
|
||||||
.execute(&mut conn)
|
|
||||||
.await
|
.await
|
||||||
|
.optional()
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
let Some(id_val) = id_opt else {
|
||||||
|
break None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let opt = diesel::update(job_queue)
|
||||||
|
.filter(id.eq(id_val))
|
||||||
|
.filter(status.eq(JobStatus::New))
|
||||||
|
.set((
|
||||||
|
heartbeat.eq(timestamp),
|
||||||
|
status.eq(JobStatus::Running),
|
||||||
|
worker.eq(worker_id),
|
||||||
|
))
|
||||||
|
.returning((id, job))
|
||||||
|
.get_result(&mut conn)
|
||||||
|
.await
|
||||||
|
.optional()
|
||||||
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
if let Some(tup) = opt {
|
||||||
|
break Some(tup);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((job_id, job_json)) = opt {
|
||||||
return Ok((JobId(job_id), job_json));
|
return Ok((JobId(job_id), job_json));
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()).await;
|
|
||||||
|
|
||||||
diesel::sql_query("UNLISTEN queue_status_channel;")
|
|
||||||
.execute(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(PostgresError::Diesel)?;
|
|
||||||
|
|
||||||
drop(conn);
|
drop(conn);
|
||||||
|
if actix_rt::time::timeout(Duration::from_secs(5), notifier.notified())
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
tracing::debug!("Notified");
|
||||||
|
} else {
|
||||||
|
tracing::debug!("Timed out");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -908,7 +933,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::job_queue::dsl::*;
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
|
||||||
|
|
||||||
|
@ -935,7 +960,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::job_queue::dsl::*;
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(job_queue)
|
diesel::delete(job_queue)
|
||||||
.filter(
|
.filter(
|
||||||
|
@ -957,7 +982,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let count = store_migrations
|
let count = store_migrations
|
||||||
.count()
|
.count()
|
||||||
|
@ -976,7 +1001,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::insert_into(store_migrations)
|
diesel::insert_into(store_migrations)
|
||||||
.values((
|
.values((
|
||||||
|
@ -996,7 +1021,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
async fn is_migrated(&self, input_old_identifier: &Arc<str>) -> Result<bool, RepoError> {
|
async fn is_migrated(&self, input_old_identifier: &Arc<str>) -> Result<bool, RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let b = diesel::select(diesel::dsl::exists(
|
let b = diesel::select(diesel::dsl::exists(
|
||||||
store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())),
|
store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())),
|
||||||
|
@ -1012,7 +1037,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
async fn clear(&self) -> Result<(), RepoError> {
|
async fn clear(&self) -> Result<(), RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(store_migrations)
|
diesel::delete(store_migrations)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
@ -1029,7 +1054,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::insert_into(proxies)
|
diesel::insert_into(proxies)
|
||||||
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
||||||
|
@ -1044,7 +1069,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
|
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = proxies
|
let opt = proxies
|
||||||
.select(alias)
|
.select(alias)
|
||||||
|
@ -1061,7 +1086,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
|
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(proxies)
|
diesel::delete(proxies)
|
||||||
.filter(alias.eq(&input_alias))
|
.filter(alias.eq(&input_alias))
|
||||||
|
@ -1083,7 +1108,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = to_primitive(timestamp);
|
let timestamp = to_primitive(timestamp);
|
||||||
|
|
||||||
|
@ -1104,7 +1129,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = proxies
|
let opt = proxies
|
||||||
.select(accessed)
|
.select(accessed)
|
||||||
|
@ -1132,7 +1157,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = inner.get_connection().await?;
|
||||||
|
|
||||||
let vec = proxies
|
let vec = proxies
|
||||||
.select((accessed, alias))
|
.select((accessed, alias))
|
||||||
|
@ -1166,7 +1191,7 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let timestamp = to_primitive(input_accessed);
|
let timestamp = to_primitive(input_accessed);
|
||||||
|
|
||||||
|
@ -1188,7 +1213,7 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let opt = variants
|
let opt = variants
|
||||||
.select(accessed)
|
.select(accessed)
|
||||||
|
@ -1216,7 +1241,7 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = inner.get_connection().await?;
|
||||||
|
|
||||||
let vec = variants
|
let vec = variants
|
||||||
.select((accessed, (hash, variant)))
|
.select((accessed, (hash, variant)))
|
||||||
|
@ -1281,7 +1306,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
async fn create_upload(&self) -> Result<UploadId, RepoError> {
|
async fn create_upload(&self) -> Result<UploadId, RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let uuid = diesel::insert_into(uploads)
|
let uuid = diesel::insert_into(uploads)
|
||||||
.default_values()
|
.default_values()
|
||||||
|
@ -1298,7 +1323,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::sql_query("LISTEN upload_completion_channel;")
|
diesel::sql_query("LISTEN upload_completion_channel;")
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
|
@ -1315,28 +1340,25 @@ impl UploadRepo for PostgresRepo {
|
||||||
.flatten();
|
.flatten();
|
||||||
|
|
||||||
if let Some(upload_result) = opt {
|
if let Some(upload_result) = opt {
|
||||||
diesel::sql_query("UNLISTEN upload_completion_channel;")
|
|
||||||
.execute(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(PostgresError::Diesel)?;
|
|
||||||
|
|
||||||
let upload_result: InnerUploadResult = serde_json::from_value(upload_result)
|
let upload_result: InnerUploadResult = serde_json::from_value(upload_result)
|
||||||
.map_err(PostgresError::DeserializeUploadResult)?;
|
.map_err(PostgresError::DeserializeUploadResult)?;
|
||||||
|
|
||||||
return Ok(upload_result.into());
|
return Ok(upload_result.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = actix_rt::time::timeout(
|
drop(conn);
|
||||||
Duration::from_secs(2),
|
|
||||||
|
if actix_rt::time::timeout(
|
||||||
|
Duration::from_secs(5),
|
||||||
self.inner.upload_notifier.notified(),
|
self.inner.upload_notifier.notified(),
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
|
.is_ok()
|
||||||
diesel::sql_query("UNLISTEN upload_completion_channel;")
|
{
|
||||||
.execute(&mut conn)
|
tracing::debug!("Notified");
|
||||||
.await
|
} else {
|
||||||
.map_err(PostgresError::Diesel)?;
|
tracing::debug!("Timed out");
|
||||||
drop(conn);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1344,7 +1366,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
diesel::delete(uploads)
|
diesel::delete(uploads)
|
||||||
.filter(id.eq(upload_id.id))
|
.filter(id.eq(upload_id.id))
|
||||||
|
@ -1363,7 +1385,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
) -> Result<(), RepoError> {
|
) -> Result<(), RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.get_connection().await?;
|
||||||
|
|
||||||
let upload_result: InnerUploadResult = upload_result.into();
|
let upload_result: InnerUploadResult = upload_result.into();
|
||||||
let upload_result =
|
let upload_result =
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
// @generated automatically by Diesel CLI.
|
// @generated automatically by Diesel CLI.
|
||||||
|
|
||||||
pub mod sql_types {
|
pub mod sql_types {
|
||||||
#[derive(diesel::sql_types::SqlType)]
|
#[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)]
|
||||||
#[diesel(postgres_type(name = "job_status"))]
|
#[diesel(postgres_type(name = "job_status"))]
|
||||||
pub struct JobStatus;
|
pub struct JobStatus;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue