diff --git a/src/queue.rs b/src/queue.rs index 509c191..e92b374 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -213,7 +213,6 @@ async fn process_jobs( callback: F, ) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, @@ -274,7 +273,6 @@ async fn job_loop( ) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, @@ -315,7 +313,6 @@ async fn process_image_jobs( callback: F, ) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn( &'a R, @@ -353,7 +350,6 @@ async fn image_job_loop( ) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn( &'a R, diff --git a/src/repo.rs b/src/repo.rs index 8ef7c14..dd70082 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -4,7 +4,7 @@ use crate::{ store::{Identifier, StoreError}, }; use futures_util::Stream; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; @@ -131,16 +131,9 @@ where } } -pub(crate) trait BaseRepo { - type Bytes: AsRef<[u8]> + From> + Clone; -} +pub(crate) trait BaseRepo {} -impl BaseRepo for actix_web::web::Data -where - T: BaseRepo, -{ - type Bytes = T::Bytes; -} +impl BaseRepo for actix_web::web::Data where T: BaseRepo {} #[async_trait::async_trait(?Send)] pub(crate) trait ProxyRepo: BaseRepo { @@ -301,9 +294,9 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result; + async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result; - async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError>; + async fn pop(&self, queue: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError>; async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>; @@ -315,11 +308,11 @@ impl QueueRepo for actix_web::web::Data where T: QueueRepo, { - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result { + async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result { T::push(self, queue, job).await } - async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { + async fn pop(&self, queue: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError> { T::pop(self, queue).await } @@ -334,8 +327,8 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo: BaseRepo { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>; - async fn get(&self, key: &'static str) -> Result, RepoError>; + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>; + async fn get(&self, key: &'static str) -> Result>, RepoError>; async fn remove(&self, key: &'static str) -> Result<(), RepoError>; } @@ -344,11 +337,11 @@ impl SettingsRepo for actix_web::web::Data where T: SettingsRepo, { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> { T::set(self, key, value).await } - async fn get(&self, key: &'static str) -> Result, RepoError> { + async fn get(&self, key: &'static str) -> Result>, RepoError> { T::get(self, key).await } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 636105d..822620c 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -181,9 +181,7 @@ impl SledRepo { } } -impl BaseRepo for SledRepo { - type Bytes = IVec; -} +impl BaseRepo for SledRepo {} #[async_trait::async_trait(?Send)] impl FullRepo for SledRepo { @@ -661,7 +659,7 @@ fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] - async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result { + async fn push(&self, queue_name: &'static str, job: Arc<[u8]>) -> Result { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); @@ -674,7 +672,7 @@ impl QueueRepo for SledRepo { (&queue, &job_state).transaction(|(queue, job_state)| { let state = JobState::pending(); - queue.insert(&key[..], &job)?; + queue.insert(&key[..], &job[..])?; job_state.insert(&key[..], state.as_bytes())?; Ok(()) @@ -706,7 +704,7 @@ impl QueueRepo for SledRepo { } #[tracing::instrument(skip(self))] - async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { + async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); let now = time::OffsetDateTime::now_utc(); @@ -754,9 +752,11 @@ impl QueueRepo for SledRepo { let job_id = JobId::from_bytes(id_bytes); - let opt = queue.get(&key)?.map(|job_bytes| (job_id, job_bytes)); + let opt = queue + .get(&key)? + .map(|job_bytes| (job_id, Arc::from(job_bytes.to_vec()))); - return Ok(opt) as Result, SledError>; + return Ok(opt) as Result)>, SledError>; } Ok(None) @@ -842,17 +842,17 @@ impl QueueRepo for SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { #[tracing::instrument(level = "trace", skip(value))] - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { - b!(self.settings, settings.insert(key, value)); + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> { + b!(self.settings, settings.insert(key, &value[..])); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn get(&self, key: &'static str) -> Result, RepoError> { + async fn get(&self, key: &'static str) -> Result>, RepoError> { let opt = b!(self.settings, settings.get(key)); - Ok(opt) + Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) } #[tracing::instrument(level = "trace", skip(self))]