diff --git a/src/main.rs b/src/main.rs index ee3fd54..a022a46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -127,6 +127,7 @@ async fn upload( for mut image in images { image.result.disarm(); } + Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": files @@ -539,6 +540,8 @@ async fn launch( repo: R, store: S, ) -> color_eyre::Result<()> { + repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) + .await?; // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it diff --git a/src/queue.rs b/src/queue.rs index 6c404eb..fbfc1ae 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,6 +6,7 @@ use crate::{ store::{Identifier, Store}, }; use std::{future::Future, path::PathBuf, pin::Pin}; +use tracing::Instrument; use uuid::Uuid; mod cleanup; @@ -111,7 +112,7 @@ pub(crate) async fn queue_generate( } pub(crate) async fn process_cleanup(repo: R, store: S, worker_id: String) { - process_jobs(&repo, &store, worker_id, cleanup::perform).await + process_jobs(&repo, &store, worker_id, CLEANUP_QUEUE, cleanup::perform).await } pub(crate) async fn process_images( @@ -119,26 +120,25 @@ pub(crate) async fn process_images( store: S, worker_id: String, ) { - process_jobs(&repo, &store, worker_id, process::perform).await + process_jobs(&repo, &store, worker_id, PROCESS_QUEUE, process::perform).await } type LocalBoxFuture<'a, T> = Pin + 'a>>; -async fn process_jobs(repo: &R, store: &S, worker_id: String, callback: F) -where +async fn process_jobs( + repo: &R, + store: &S, + worker_id: String, + queue: &'static str, + callback: F, +) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { - if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await { - if let Err(e) = (callback)(repo, store, job.as_ref()).await { - tracing::warn!("Failed to run previously dropped job: {}", e); - tracing::warn!("{:?}", e); - } - } loop { - let res = job_loop(repo, store, worker_id.clone(), callback).await; + let res = job_loop(repo, store, worker_id.clone(), queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", e); @@ -150,7 +150,13 @@ where } } -async fn job_loop(repo: &R, store: &S, worker_id: String, callback: F) -> Result<(), Error> +async fn job_loop( + repo: &R, + store: &S, + worker_id: String, + queue: &'static str, + callback: F, +) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, @@ -158,10 +164,12 @@ where for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { - let bytes = repo - .pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec()) - .await?; + let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; - (callback)(repo, store, bytes.as_ref()).await?; + let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + + span.in_scope(|| (callback)(repo, store, bytes.as_ref())) + .instrument(span) + .await?; } } diff --git a/src/repo.rs b/src/repo.rs index 7137b65..e823bcf 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -97,7 +97,7 @@ pub(crate) trait UploadRepo: BaseRepo { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn in_progress(&self, worker_id: Vec) -> Result, Error>; + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), Error>; async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 99850b8..ab568b0 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -155,20 +155,60 @@ impl UploadRepo for SledRepo { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { - async fn in_progress(&self, worker_id: Vec) -> Result, Error> { - let opt = b!(self.in_progress_queue, in_progress_queue.get(worker_id)); + #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), Error> { + let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, { + let vec = in_progress_queue + .scan_prefix(worker_prefix) + .values() + .filter_map(Result::ok) + .filter_map(|ivec| { + let index = ivec.as_ref().iter().enumerate().find_map(|(index, byte)| { + if *byte == 0 { + Some(index) + } else { + None + } + })?; - Ok(opt) + let (queue, job) = ivec.split_at(index); + if queue.is_empty() || job.len() <= 1 { + return None; + } + let job = &job[1..]; + + Some((String::from_utf8_lossy(queue).to_string(), IVec::from(job))) + }) + .collect::>(); + + Ok(vec) as Result<_, Error> + }); + + let db = self.db.clone(); + b!(self.queue, { + for (queue_name, job) in vec { + let id = db.generate_id()?; + let mut key = queue_name.as_bytes().to_vec(); + key.extend(id.to_be_bytes()); + + queue.insert(key, job)?; + } + + Ok(()) as Result<(), Error> + }); + + Ok(()) } - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> { + #[tracing::instrument(skip(self, job), fields(worker_id = %String::from_utf8_lossy(&job)))] + async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> { let id = self.db.generate_id()?; - let mut key = queue.as_bytes().to_vec(); + let mut key = queue_name.as_bytes().to_vec(); key.extend(id.to_be_bytes()); b!(self.queue, queue.insert(key, job)); - if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue) { + if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { notifier.notify_one(); return Ok(()); } @@ -176,13 +216,14 @@ impl QueueRepo for SledRepo { self.queue_notifier .write() .unwrap() - .entry(queue) + .entry(queue_name) .or_insert_with(|| Arc::new(Notify::new())) .notify_one(); Ok(()) } + #[tracing::instrument(skip(self, worker_id), fields(worker_id = %String::from_utf8_lossy(&worker_id)))] async fn pop( &self, queue_name: &'static str, @@ -199,7 +240,11 @@ impl QueueRepo for SledRepo { .scan_prefix(queue_name.as_bytes()) .find_map(Result::ok) { - in_progress_queue.insert(&worker_id, &job)?; + let mut in_progress_value = queue_name.as_bytes().to_vec(); + in_progress_value.push(0); + in_progress_value.extend_from_slice(&job); + + in_progress_queue.insert(&worker_id, in_progress_value)?; if queue.remove(key)?.is_some() { return Ok(Some(job));