From 443d327edfe171b618db46f723467e35e97f4c17 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 3 Sep 2023 12:47:06 -0500 Subject: [PATCH] Implement a couple more repo traits --- Cargo.lock | 13 + Cargo.toml | 1 + docs/postgres-planning.md | 8 +- src/error_code.rs | 3 + src/queue.rs | 79 +-- src/queue/cleanup.rs | 4 +- src/queue/process.rs | 4 +- src/repo.rs | 155 +----- src/repo/alias.rs | 121 ++++ src/repo/delete_token.rs | 88 +++ src/repo/postgres.rs | 517 +++++++++++++++++- src/repo/postgres/job_status.rs | 6 + .../migrations/V0006__create_queue.rs | 5 +- .../V0007__create_store_migrations.rs | 3 +- src/repo/postgres/schema.rs | 8 +- src/repo/sled.rs | 25 +- 16 files changed, 800 insertions(+), 240 deletions(-) create mode 100644 src/repo/alias.rs create mode 100644 src/repo/delete_token.rs create mode 100644 src/repo/postgres/job_status.rs diff --git a/Cargo.lock b/Cargo.lock index 1a31c37..fae5c89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,6 +750,18 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "diesel-derive-enum" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81c5131a2895ef64741dad1d483f358c2a229a3a2d1b256778cdc5e146db64d4" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "diesel_derives" version = "2.1.1" @@ -1807,6 +1819,7 @@ dependencies = [ "deadpool", "diesel", "diesel-async", + "diesel-derive-enum", "flume", "futures-core", "hex", diff --git a/Cargo.toml b/Cargo.toml index 449d844..abf9469 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ dashmap = "5.1.0" deadpool = { version = "0.9.5", features = ["rt_tokio_1"] } diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } diesel-async = { version = "0.4.1", features = ["postgres", "deadpool"] } +diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } flume = "0.11.0" futures-core = "0.3" hex = "0.4.3" diff --git a/docs/postgres-planning.md b/docs/postgres-planning.md index 36801d0..d39d5cb 100644 --- a/docs/postgres-planning.md +++ b/docs/postgres-planning.md @@ -155,7 +155,7 @@ methods: CREATE TYPE job_status AS ENUM ('new', 'running'); -CREATE TABLE queue ( +CREATE TABLE job_queue ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), queue VARCHAR(30) NOT NULL, job JSONB NOT NULL, @@ -171,14 +171,14 @@ CREATE INDEX heartbeat_index ON queue INCLUDE heartbeat; claiming a job can be ```sql -UPDATE queue SET status = 'new', heartbeat = NULL +UPDATE job_queue SET status = 'new', heartbeat = NULL WHERE heartbeat IS NOT NULL AND heartbeat < NOW - INTERVAL '2 MINUTES'; -UPDATE queue SET status = 'running', heartbeat = CURRENT_TIMESTAMP +UPDATE job_queue SET status = 'running', heartbeat = CURRENT_TIMESTAMP WHERE id = ( SELECT id - FROM queue + FROM job_queue WHERE status = 'new' AND queue = '$QUEUE' ORDER BY queue_time ASC FOR UPDATE SKIP LOCKED diff --git a/src/error_code.rs b/src/error_code.rs index 67eaef2..365b261 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -62,6 +62,9 @@ impl ErrorCode { pub(crate) const EXTRACT_UPLOAD_RESULT: ErrorCode = ErrorCode { code: "extract-upload-result", }; + pub(crate) const EXTRACT_JOB: ErrorCode = ErrorCode { + code: "extract-job", + }; pub(crate) const CONFLICTED_RECORD: ErrorCode = ErrorCode { code: "conflicted-record", }; diff --git a/src/queue.rs b/src/queue.rs index a2c9deb..3f84fae 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -7,7 +7,6 @@ use crate::{ serde_str::Serde, store::Store, }; -use base64::{prelude::BASE64_STANDARD, Engine}; use std::{ future::Future, path::PathBuf, @@ -20,32 +19,6 @@ use tracing::Instrument; mod cleanup; mod process; -#[derive(Debug)] -struct Base64Bytes(Vec); - -impl serde::Serialize for Base64Bytes { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let s = BASE64_STANDARD.encode(&self.0); - s.serialize(serializer) - } -} - -impl<'de> serde::Deserialize<'de> for Base64Bytes { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let s: String = serde::Deserialize::deserialize(deserializer)?; - BASE64_STANDARD - .decode(s) - .map(Base64Bytes) - .map_err(|e| serde::de::Error::custom(e.to_string())) - } -} - const CLEANUP_QUEUE: &str = "cleanup"; const PROCESS_QUEUE: &str = "process"; @@ -91,18 +64,18 @@ pub(crate) async fn cleanup_alias( alias: Alias, token: DeleteToken, ) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::Alias { + let job = serde_json::to_value(&Cleanup::Alias { alias: Serde::new(alias), token: Serde::new(token), }) .map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_hash(repo: &Arc, hash: Hash) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + let job = serde_json::to_value(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } @@ -110,11 +83,11 @@ pub(crate) async fn cleanup_identifier( repo: &Arc, identifier: &Arc, ) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::Identifier { + let job = serde_json::to_value(&Cleanup::Identifier { identifier: identifier.to_string(), }) .map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } @@ -124,26 +97,26 @@ async fn cleanup_variants( variant: Option, ) -> Result<(), Error> { let job = - serde_json::to_string(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + serde_json::to_value(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_outdated_proxies(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + let job = serde_json::to_value(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_outdated_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + let job = serde_json::to_value(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_all_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_string(&Cleanup::AllVariants).map_err(UploadError::PushJob)?; - repo.push(CLEANUP_QUEUE, job.into()).await?; + let job = serde_json::to_value(&Cleanup::AllVariants).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } @@ -153,13 +126,13 @@ pub(crate) async fn queue_ingest( upload_id: UploadId, declared_alias: Option, ) -> Result<(), Error> { - let job = serde_json::to_string(&Process::Ingest { + let job = serde_json::to_value(&Process::Ingest { identifier: identifier.to_string(), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), }) .map_err(UploadError::PushJob)?; - repo.push(PROCESS_QUEUE, job.into()).await?; + repo.push(PROCESS_QUEUE, job).await?; Ok(()) } @@ -170,14 +143,14 @@ pub(crate) async fn queue_generate( process_path: PathBuf, process_args: Vec, ) -> Result<(), Error> { - let job = serde_json::to_string(&Process::Generate { + let job = serde_json::to_value(&Process::Generate { target_format, source: Serde::new(source), process_path, process_args, }) .map_err(UploadError::PushJob)?; - repo.push(PROCESS_QUEUE, job.into()).await?; + repo.push(PROCESS_QUEUE, job).await?; Ok(()) } @@ -220,7 +193,7 @@ async fn process_jobs( &'a Arc, &'a S, &'a Configuration, - &'a str, + serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { @@ -284,13 +257,13 @@ where &'a Arc, &'a S, &'a Configuration, - &'a str, + serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { let fut = async { - let (job_id, string) = repo.pop(queue, worker_id).await?; + let (job_id, job) = repo.pop(queue, worker_id).await?; let span = tracing::info_span!("Running Job"); @@ -303,7 +276,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, config, string.as_ref()), + (callback)(repo, store, config, job), ) }) .instrument(span) @@ -337,7 +310,7 @@ async fn process_image_jobs( &'a S, &'a ProcessMap, &'a Configuration, - &'a str, + serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { @@ -373,13 +346,13 @@ where &'a S, &'a ProcessMap, &'a Configuration, - &'a str, + serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { let fut = async { - let (job_id, string) = repo.pop(queue, worker_id).await?; + let (job_id, job) = repo.pop(queue, worker_id).await?; let span = tracing::info_span!("Running Job"); @@ -392,7 +365,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, process_map, config, string.as_ref()), + (callback)(repo, store, process_map, config, job), ) }) .instrument(span) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index a6b76fd..aa31814 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -14,13 +14,13 @@ pub(super) fn perform<'a, S>( repo: &'a ArcRepo, store: &'a S, configuration: &'a Configuration, - job: &'a str, + job: serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> where S: Store, { Box::pin(async move { - match serde_json::from_str(job) { + match serde_json::from_value(job) { Ok(job) => match job { Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?, Cleanup::Identifier { diff --git a/src/queue/process.rs b/src/queue/process.rs index a5f8e43..e257f10 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -17,13 +17,13 @@ pub(super) fn perform<'a, S>( store: &'a S, process_map: &'a ProcessMap, config: &'a Configuration, - job: &'a str, + job: serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> where S: Store + 'static, { Box::pin(async move { - match serde_json::from_str(job) { + match serde_json::from_value(job) { Ok(job) => match job { Process::Ingest { identifier, diff --git a/src/repo.rs b/src/repo.rs index 22318d4..e49f84a 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -9,11 +9,15 @@ use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; +mod alias; +mod delete_token; mod hash; mod migrate; pub(crate) mod postgres; pub(crate) mod sled; +pub(crate) use alias::Alias; +pub(crate) use delete_token::DeleteToken; pub(crate) use hash::Hash; pub(crate) use migrate::{migrate_04, migrate_repo}; @@ -31,17 +35,6 @@ enum MaybeUuid { Name(String), } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct Alias { - id: MaybeUuid, - extension: Option, -} - -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct DeleteToken { - id: MaybeUuid, -} - #[derive(Debug)] pub(crate) struct HashAlreadyExists; #[derive(Debug)] @@ -372,13 +365,13 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn push(&self, queue: &'static str, job: Arc) -> Result; + async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result; async fn pop( &self, queue: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc), RepoError>; + ) -> Result<(JobId, serde_json::Value), RepoError>; async fn heartbeat( &self, @@ -400,7 +393,7 @@ impl QueueRepo for Arc where T: QueueRepo, { - async fn push(&self, queue: &'static str, job: Arc) -> Result { + async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result { T::push(self, queue, job).await } @@ -408,7 +401,7 @@ where &self, queue: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc), RepoError> { + ) -> Result<(JobId, serde_json::Value), RepoError> { T::pop(self, queue, worker_id).await } @@ -903,106 +896,6 @@ impl MaybeUuid { } } -fn split_at_dot(s: &str) -> Option<(&str, &str)> { - let index = s.find('.')?; - - Some(s.split_at(index)) -} - -impl Alias { - pub(crate) fn generate(extension: String) -> Self { - Alias { - id: MaybeUuid::Uuid(Uuid::new_v4()), - extension: Some(extension), - } - } - - pub(crate) fn from_existing(alias: &str) -> Self { - if let Some((start, end)) = split_at_dot(alias) { - Alias { - id: MaybeUuid::from_str(start), - extension: Some(end.into()), - } - } else { - Alias { - id: MaybeUuid::from_str(alias), - extension: None, - } - } - } - - pub(crate) fn extension(&self) -> Option<&str> { - self.extension.as_deref() - } - - pub(crate) fn to_bytes(&self) -> Vec { - let mut v = self.id.as_bytes().to_vec(); - - if let Some(ext) = self.extension() { - v.extend_from_slice(ext.as_bytes()); - } - - v - } - - pub(crate) fn from_slice(bytes: &[u8]) -> Option { - if let Ok(s) = std::str::from_utf8(bytes) { - Some(Self::from_existing(s)) - } else if bytes.len() >= 16 { - let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); - - let extension = if bytes.len() > 16 { - Some(String::from_utf8_lossy(&bytes[16..]).to_string()) - } else { - None - }; - - Some(Self { - id: MaybeUuid::Uuid(id), - extension, - }) - } else { - None - } - } -} - -impl DeleteToken { - pub(crate) fn from_existing(existing: &str) -> Self { - if let Ok(uuid) = Uuid::parse_str(existing) { - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - } else { - DeleteToken { - id: MaybeUuid::Name(existing.into()), - } - } - } - - pub(crate) fn generate() -> Self { - Self { - id: MaybeUuid::Uuid(Uuid::new_v4()), - } - } - - pub(crate) fn to_bytes(&self) -> Vec { - self.id.as_bytes().to_vec() - } - - pub(crate) fn from_slice(bytes: &[u8]) -> Option { - if let Ok(s) = std::str::from_utf8(bytes) { - Some(DeleteToken::from_existing(s)) - } else if bytes.len() == 16 { - Some(DeleteToken { - id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?), - }) - } else { - None - } - } -} - impl UploadId { pub(crate) fn generate() -> Self { Self { id: Uuid::new_v4() } @@ -1036,38 +929,6 @@ impl std::fmt::Display for MaybeUuid { } } -impl std::str::FromStr for DeleteToken { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - Ok(DeleteToken::from_existing(s)) - } -} - -impl std::fmt::Display for DeleteToken { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.id) - } -} - -impl std::str::FromStr for Alias { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - Ok(Alias::from_existing(s)) - } -} - -impl std::fmt::Display for Alias { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(ext) = self.extension() { - write!(f, "{}{ext}", self.id) - } else { - write!(f, "{}", self.id) - } - } -} - #[cfg(test)] mod tests { use super::{Alias, DeleteToken, MaybeUuid, Uuid}; diff --git a/src/repo/alias.rs b/src/repo/alias.rs new file mode 100644 index 0000000..48156e1 --- /dev/null +++ b/src/repo/alias.rs @@ -0,0 +1,121 @@ +use diesel::{backend::Backend, sql_types::VarChar, AsExpression, FromSqlRow}; +use uuid::Uuid; + +use super::MaybeUuid; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, AsExpression, FromSqlRow)] +#[diesel(sql_type = VarChar)] +pub(crate) struct Alias { + id: MaybeUuid, + extension: Option, +} + +impl diesel::serialize::ToSql for Alias { + fn to_sql<'b>( + &'b self, + out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>, + ) -> diesel::serialize::Result { + let s = self.to_string(); + + >::to_sql( + &s, + &mut out.reborrow(), + ) + } +} + +impl diesel::deserialize::FromSql for Alias +where + B: Backend, + String: diesel::deserialize::FromSql, +{ + fn from_sql( + bytes: ::RawValue<'_>, + ) -> diesel::deserialize::Result { + let s = String::from_sql(bytes)?; + + s.parse().map_err(From::from) + } +} + +impl Alias { + pub(crate) fn generate(extension: String) -> Self { + Alias { + id: MaybeUuid::Uuid(Uuid::new_v4()), + extension: Some(extension), + } + } + + pub(crate) fn from_existing(alias: &str) -> Self { + if let Some((start, end)) = split_at_dot(alias) { + Alias { + id: MaybeUuid::from_str(start), + extension: Some(end.into()), + } + } else { + Alias { + id: MaybeUuid::from_str(alias), + extension: None, + } + } + } + + pub(crate) fn extension(&self) -> Option<&str> { + self.extension.as_deref() + } + + pub(crate) fn to_bytes(&self) -> Vec { + let mut v = self.id.as_bytes().to_vec(); + + if let Some(ext) = self.extension() { + v.extend_from_slice(ext.as_bytes()); + } + + v + } + + pub(crate) fn from_slice(bytes: &[u8]) -> Option { + if let Ok(s) = std::str::from_utf8(bytes) { + Some(Self::from_existing(s)) + } else if bytes.len() >= 16 { + let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); + + let extension = if bytes.len() > 16 { + Some(String::from_utf8_lossy(&bytes[16..]).to_string()) + } else { + None + }; + + Some(Self { + id: MaybeUuid::Uuid(id), + extension, + }) + } else { + None + } + } +} + +fn split_at_dot(s: &str) -> Option<(&str, &str)> { + let index = s.find('.')?; + + Some(s.split_at(index)) +} + +impl std::str::FromStr for Alias { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(Alias::from_existing(s)) + } +} + +impl std::fmt::Display for Alias { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ext) = self.extension() { + write!(f, "{}{ext}", self.id) + } else { + write!(f, "{}", self.id) + } + } +} diff --git a/src/repo/delete_token.rs b/src/repo/delete_token.rs new file mode 100644 index 0000000..7c06b09 --- /dev/null +++ b/src/repo/delete_token.rs @@ -0,0 +1,88 @@ +use diesel::{backend::Backend, sql_types::VarChar, AsExpression, FromSqlRow}; +use uuid::Uuid; + +use super::MaybeUuid; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, AsExpression, FromSqlRow)] +#[diesel(sql_type = VarChar)] +pub(crate) struct DeleteToken { + id: MaybeUuid, +} + +impl diesel::serialize::ToSql for DeleteToken { + fn to_sql<'b>( + &'b self, + out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>, + ) -> diesel::serialize::Result { + let s = self.to_string(); + + >::to_sql( + &s, + &mut out.reborrow(), + ) + } +} + +impl diesel::deserialize::FromSql for DeleteToken +where + B: Backend, + String: diesel::deserialize::FromSql, +{ + fn from_sql( + bytes: ::RawValue<'_>, + ) -> diesel::deserialize::Result { + let s = String::from_sql(bytes)?; + + s.parse().map_err(From::from) + } +} + +impl DeleteToken { + pub(crate) fn from_existing(existing: &str) -> Self { + if let Ok(uuid) = Uuid::parse_str(existing) { + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + } else { + DeleteToken { + id: MaybeUuid::Name(existing.into()), + } + } + } + + pub(crate) fn generate() -> Self { + Self { + id: MaybeUuid::Uuid(Uuid::new_v4()), + } + } + + pub(crate) fn to_bytes(&self) -> Vec { + self.id.as_bytes().to_vec() + } + + pub(crate) fn from_slice(bytes: &[u8]) -> Option { + if let Ok(s) = std::str::from_utf8(bytes) { + Some(DeleteToken::from_existing(s)) + } else if bytes.len() == 16 { + Some(DeleteToken { + id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?), + }) + } else { + None + } + } +} + +impl std::str::FromStr for DeleteToken { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(DeleteToken::from_existing(s)) + } +} + +impl std::fmt::Display for DeleteToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index a3720d5..eeea24d 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1,8 +1,10 @@ mod embedded; +mod job_status; mod schema; use std::sync::Arc; +use dashmap::{DashMap, DashSet}; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ @@ -11,20 +13,58 @@ use diesel_async::{ }, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; +use tokio::sync::Notify; use tokio_postgres::{AsyncMessage, Notification}; use url::Url; +use uuid::Uuid; -use crate::error_code::ErrorCode; +use crate::{details::Details, error_code::ErrorCode}; + +use self::job_status::JobStatus; use super::{ - BaseRepo, Hash, HashAlreadyExists, HashPage, HashRepo, OrderedHash, RepoError, - VariantAlreadyExists, + Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, Hash, + HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, QueueRepo, RepoError, SettingsRepo, + StoreMigrationRepo, UploadId, VariantAlreadyExists, }; #[derive(Clone)] pub(crate) struct PostgresRepo { + inner: Arc, + notifications: Arc>, +} + +struct Inner { pool: Pool, - notifications: flume::Receiver, + queue_notifications: DashMap>, + completed_uploads: DashSet, + upload_notifier: Notify, +} + +async fn delegate_notifications(receiver: flume::Receiver, inner: Arc) { + while let Ok(notification) = receiver.recv_async().await { + match notification.channel() { + "queue_status_channel" => { + // new job inserted for queue + let queue_name = notification.payload().to_string(); + + inner + .queue_notifications + .entry(queue_name) + .or_insert_with(|| Arc::new(Notify::new())) + .notify_waiters(); + } + channel => { + tracing::info!( + "Unhandled postgres notification: {channel}: {}", + notification.payload() + ); + } + } + todo!() + } + + tracing::warn!("Notification delegator shutting down"); } #[derive(Debug, thiserror::Error)] @@ -46,6 +86,15 @@ pub(crate) enum PostgresError { #[error("Error in database")] Diesel(#[source] diesel::result::Error), + + #[error("Error deserializing hex value")] + Hex(#[source] hex::FromHexError), + + #[error("Error serializing details")] + SerializeDetails(#[source] serde_json::Error), + + #[error("Error deserializing details")] + DeserializeDetails(#[source] serde_json::Error), } impl PostgresError { @@ -71,7 +120,7 @@ impl PostgresRepo { handle.abort(); let _ = handle.await; - let (tx, notifications) = flume::bounded(10); + let (tx, rx) = flume::bounded(10); let mut config = ManagerConfig::default(); config.custom_setup = build_handler(tx); @@ -84,8 +133,17 @@ impl PostgresRepo { .build() .map_err(ConnectPostgresError::BuildPool)?; - Ok(PostgresRepo { + let inner = Arc::new(Inner { pool, + queue_notifications: DashMap::new(), + completed_uploads: DashSet::new(), + upload_notifier: Notify::new(), + }); + + let notifications = Arc::new(actix_rt::spawn(delegate_notifications(rx, inner.clone()))); + + Ok(PostgresRepo { + inner, notifications, }) } @@ -147,7 +205,7 @@ impl HashRepo for PostgresRepo { async fn size(&self) -> Result { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let count = hashes .count() @@ -161,7 +219,7 @@ impl HashRepo for PostgresRepo { async fn bound(&self, input_hash: Hash) -> Result, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = hashes .select(created_at) @@ -185,7 +243,7 @@ impl HashRepo for PostgresRepo { ) -> Result { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = to_primitive(date); @@ -212,7 +270,7 @@ impl HashRepo for PostgresRepo { ) -> Result { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let (mut page, prev) = if let Some(OrderedHash { timestamp, @@ -276,7 +334,7 @@ impl HashRepo for PostgresRepo { ) -> Result, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = to_primitive(timestamp); @@ -306,7 +364,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::update(hashes) .filter(hash.eq(&input_hash)) @@ -321,7 +379,7 @@ impl HashRepo for PostgresRepo { async fn identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = hashes .select(identifier) @@ -342,7 +400,7 @@ impl HashRepo for PostgresRepo { ) -> Result, RepoError> { use schema::variants::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let res = diesel::insert_into(variants) .values(( @@ -370,7 +428,7 @@ impl HashRepo for PostgresRepo { ) -> Result>, RepoError> { use schema::variants::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = variants .select(identifier) @@ -388,7 +446,7 @@ impl HashRepo for PostgresRepo { async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { use schema::variants::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let vec = variants .select((variant, identifier)) @@ -410,7 +468,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::variants::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(variants) .filter(hash.eq(&input_hash)) @@ -429,7 +487,7 @@ impl HashRepo for PostgresRepo { ) -> Result<(), RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::update(hashes) .filter(hash.eq(&input_hash)) @@ -444,7 +502,7 @@ impl HashRepo for PostgresRepo { async fn motion_identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = hashes .select(motion_identifier) @@ -460,7 +518,7 @@ impl HashRepo for PostgresRepo { } async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> { - let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; conn.transaction(|conn| { Box::pin(async move { @@ -482,6 +540,425 @@ impl HashRepo for PostgresRepo { } } +#[async_trait::async_trait(?Send)] +impl AliasRepo for PostgresRepo { + async fn create_alias( + &self, + input_alias: &Alias, + delete_token: &DeleteToken, + input_hash: Hash, + ) -> Result, RepoError> { + use schema::aliases::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let res = diesel::insert_into(aliases) + .values(( + alias.eq(input_alias), + hash.eq(&input_hash), + token.eq(delete_token), + )) + .execute(&mut conn) + .await; + + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(AliasAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } + } + + async fn delete_token(&self, input_alias: &Alias) -> Result, RepoError> { + use schema::aliases::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = aliases + .select(token) + .filter(alias.eq(input_alias)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + Ok(opt) + } + + async fn hash(&self, input_alias: &Alias) -> Result, RepoError> { + use schema::aliases::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = aliases + .select(hash) + .filter(alias.eq(input_alias)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + Ok(opt) + } + + async fn aliases_for_hash(&self, input_hash: Hash) -> Result, RepoError> { + use schema::aliases::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let vec = aliases + .select(alias) + .filter(hash.eq(&input_hash)) + .get_results(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(vec) + } + + async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> { + use schema::aliases::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(aliases) + .filter(alias.eq(input_alias)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl SettingsRepo for PostgresRepo { + async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> { + use schema::settings::dsl::*; + + let input_value = hex::encode(input_value); + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::insert_into(settings) + .values((key.eq(input_key), value.eq(input_value))) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn get(&self, input_key: &'static str) -> Result>, RepoError> { + use schema::settings::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = settings + .select(value) + .filter(key.eq(input_key)) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(hex::decode) + .transpose() + .map_err(PostgresError::Hex)? + .map(Arc::from); + + Ok(opt) + } + + async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> { + use schema::settings::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(settings) + .filter(key.eq(input_key)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl DetailsRepo for PostgresRepo { + async fn relate_details( + &self, + input_identifier: &Arc, + input_details: &Details, + ) -> Result<(), RepoError> { + use schema::details::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let value = + serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; + + diesel::insert_into(details) + .values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn details(&self, input_identifier: &Arc) -> Result, RepoError> { + use schema::details::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = details + .select(json) + .filter(identifier.eq(input_identifier.as_ref())) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(serde_json::from_value) + .transpose() + .map_err(PostgresError::DeserializeDetails)? + .map(|inner| Details { inner }); + + Ok(opt) + } + + async fn cleanup_details(&self, input_identifier: &Arc) -> Result<(), RepoError> { + use schema::details::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(details) + .filter(identifier.eq(input_identifier.as_ref())) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl QueueRepo for PostgresRepo { + async fn push( + &self, + queue_name: &'static str, + job_json: serde_json::Value, + ) -> Result { + use schema::job_queue::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let job_id = diesel::insert_into(job_queue) + .values((queue.eq(queue_name), job.eq(job_json))) + .returning(id) + .get_result::(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(JobId(job_id)) + } + + async fn pop( + &self, + queue_name: &'static str, + worker_id: Uuid, + ) -> Result<(JobId, serde_json::Value), RepoError> { + use schema::job_queue::dsl::*; + + loop { + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let notifier: Arc = self + .inner + .queue_notifications + .entry(String::from(queue_name)) + .or_insert_with(|| Arc::new(Notify::new())) + .clone(); + + diesel::sql_query("LISTEN queue_status_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + let timestamp = to_primitive(time::OffsetDateTime::now_utc()); + + diesel::update(job_queue) + .filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2)))) + .set(( + heartbeat.eq(Option::::None), + status.eq(JobStatus::New), + )) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + // TODO: for_update().skip_locked() + let id_query = job_queue + .select(id) + .filter(status.eq(JobStatus::New).and(queue.eq(queue_name))) + .order(queue_time) + .into_boxed() + .single_value(); + + let opt = diesel::update(job_queue) + .filter(id.nullable().eq(id_query)) + .set(( + heartbeat.eq(timestamp), + status.eq(JobStatus::Running), + worker.eq(worker_id), + )) + .returning((id, job)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + if let Some((job_id, job_json)) = opt { + diesel::sql_query("UNLISTEN queue_status_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + return Ok((JobId(job_id), job_json)); + } + + let _ = actix_rt::time::timeout(std::time::Duration::from_secs(5), notifier.notified()) + .await; + + diesel::sql_query("UNLISTEN queue_status_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + drop(conn); + } + } + + async fn heartbeat( + &self, + queue_name: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { + use schema::job_queue::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = to_primitive(time::OffsetDateTime::now_utc()); + + diesel::update(job_queue) + .filter( + id.eq(job_id.0) + .and(queue.eq(queue_name)) + .and(worker.eq(worker_id)), + ) + .set(heartbeat.eq(timestamp)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn complete_job( + &self, + queue_name: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { + use schema::job_queue::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(job_queue) + .filter( + id.eq(job_id.0) + .and(queue.eq(queue_name)) + .and(worker.eq(worker_id)), + ) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl StoreMigrationRepo for PostgresRepo { + async fn is_continuing_migration(&self) -> Result { + use schema::store_migrations::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let count = store_migrations + .count() + .get_result::(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(count > 0) + } + + async fn mark_migrated( + &self, + input_old_identifier: &Arc, + input_new_identifier: &Arc, + ) -> Result<(), RepoError> { + use schema::store_migrations::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::insert_into(store_migrations) + .values(( + old_identifier.eq(input_old_identifier.as_ref()), + new_identifier.eq(input_new_identifier.as_ref()), + )) + .on_conflict((old_identifier, new_identifier)) + .do_nothing() + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn is_migrated(&self, input_old_identifier: &Arc) -> Result { + use schema::store_migrations::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let b = diesel::select(diesel::dsl::exists( + store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())), + )) + .get_result(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(b) + } + + async fn clear(&self) -> Result<(), RepoError> { + use schema::store_migrations::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(store_migrations) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + impl std::fmt::Debug for PostgresRepo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PostgresRepo") diff --git a/src/repo/postgres/job_status.rs b/src/repo/postgres/job_status.rs new file mode 100644 index 0000000..f9cee02 --- /dev/null +++ b/src/repo/postgres/job_status.rs @@ -0,0 +1,6 @@ +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, diesel_derive_enum::DbEnum)] +#[ExistingTypePath = "crate::repo::postgres::schema::sql_types::JobStatus"] +pub(super) enum JobStatus { + New, + Running, +} diff --git a/src/repo/postgres/migrations/V0006__create_queue.rs b/src/repo/postgres/migrations/V0006__create_queue.rs index d5161bf..ace111f 100644 --- a/src/repo/postgres/migrations/V0006__create_queue.rs +++ b/src/repo/postgres/migrations/V0006__create_queue.rs @@ -11,6 +11,7 @@ pub(crate) fn migration() -> String { t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#); t.add_column("queue", types::text().size(50).nullable(false)); t.add_column("job", types::custom("jsonb").nullable(false)); + t.add_column("worker", types::uuid().nullable(true)); t.add_column("status", types::custom("job_status").nullable(false)); t.add_column( "queue_time", @@ -18,7 +19,7 @@ pub(crate) fn migration() -> String { .nullable(false) .default(AutogenFunction::CurrentTimestamp), ); - t.add_column("heartbeat", types::datetime()); + t.add_column("heartbeat", types::datetime().nullable(true)); t.add_index("queue_status_index", types::index(["queue", "status"])); t.add_index("heartbeat_index", types::index(["heartbeat"])); @@ -30,7 +31,7 @@ CREATE OR REPLACE FUNCTION queue_status_notify() RETURNS trigger AS $$ BEGIN - PERFORM pg_notify('queue_status_channel', NEW.id::text); + PERFORM pg_notify('queue_status_channel', NEW.queue::text); RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/src/repo/postgres/migrations/V0007__create_store_migrations.rs b/src/repo/postgres/migrations/V0007__create_store_migrations.rs index fe957b0..db106fc 100644 --- a/src/repo/postgres/migrations/V0007__create_store_migrations.rs +++ b/src/repo/postgres/migrations/V0007__create_store_migrations.rs @@ -7,9 +7,10 @@ pub(crate) fn migration() -> String { m.create_table("store_migrations", |t| { t.add_column( - "identifier", + "old_identifier", types::text().primary(true).nullable(false).unique(true), ); + t.add_column("new_identifier", types::text().nullable(false).unique(true)); }); m.make::().to_string() diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index 1b6aba4..da98a1c 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -38,9 +38,10 @@ diesel::table! { id -> Uuid, queue -> Text, job -> Jsonb, + worker -> Nullable, status -> JobStatus, queue_time -> Timestamp, - heartbeat -> Timestamp, + heartbeat -> Nullable, } } @@ -72,8 +73,9 @@ diesel::table! { } diesel::table! { - store_migrations (identifier) { - identifier -> Text, + store_migrations (old_identifier) { + old_identifier -> Text, + new_identifier -> Text, } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 33cf611..671e186 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -46,10 +46,10 @@ pub(crate) enum SledError { Sled(#[from] sled::Error), #[error("Invalid details json")] - Details(serde_json::Error), + Details(#[source] serde_json::Error), #[error("Invalid upload result json")] - UploadResult(serde_json::Error), + UploadResult(#[source] serde_json::Error), #[error("Error parsing variant key")] VariantKey(#[from] VariantKeyError), @@ -57,6 +57,9 @@ pub(crate) enum SledError { #[error("Invalid string data in db")] Utf8(#[source] std::str::Utf8Error), + #[error("Invalid job json")] + Job(#[source] serde_json::Error), + #[error("Operation panicked")] Panic, @@ -70,6 +73,7 @@ impl SledError { Self::Sled(_) | Self::VariantKey(_) | Self::Utf8(_) => ErrorCode::SLED_ERROR, Self::Details(_) => ErrorCode::EXTRACT_DETAILS, Self::UploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT, + Self::Job(_) => ErrorCode::EXTRACT_JOB, Self::Panic => ErrorCode::PANIC, Self::Conflict => ErrorCode::CONFLICTED_RECORD, } @@ -660,11 +664,16 @@ fn try_into_arc_str(ivec: IVec) -> Result, SledError> { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { #[tracing::instrument(skip(self))] - async fn push(&self, queue_name: &'static str, job: Arc) -> Result { + async fn push( + &self, + queue_name: &'static str, + job: serde_json::Value, + ) -> Result { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); let key = job_key(queue_name, id); + let job = serde_json::to_vec(&job).map_err(SledError::Job)?; let queue = self.queue.clone(); let job_state = self.job_state.clone(); @@ -709,7 +718,7 @@ impl QueueRepo for SledRepo { &self, queue_name: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc), RepoError> { + ) -> Result<(JobId, serde_json::Value), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); let now = time::OffsetDateTime::now_utc(); @@ -762,10 +771,14 @@ impl QueueRepo for SledRepo { tracing::Span::current().record("job_id", &format!("{job_id:?}")); - let opt = queue.get(&key)?.map(try_into_arc_str).transpose()?; + let opt = queue + .get(&key)? + .map(|ivec| serde_json::from_slice(&ivec[..])) + .transpose() + .map_err(SledError::Job)?; return Ok(opt.map(|job| (job_id, job))) - as Result)>, SledError>; + as Result, SledError>; } Ok(None)