2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Add panic boundaries for background jobs

This commit is contained in:
asonix 2024-04-05 11:58:55 -05:00
parent d41fca5b6c
commit eca3697410

View file

@ -11,9 +11,11 @@ use crate::{
use std::{ use std::{
ops::Deref, ops::Deref,
rc::Rc,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::task::JoinError;
use tracing::Instrument; use tracing::Instrument;
pub(crate) mod cleanup; pub(crate) mod cleanup;
@ -297,54 +299,66 @@ where
} }
} }
fn job_result(result: &JobResult) -> crate::repo::JobResult { fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
match result { match result {
Ok(()) => crate::repo::JobResult::Success, Ok(Ok(())) => crate::repo::JobResult::Success,
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure, Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted, Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
Err(_) => crate::repo::JobResult::Aborted,
} }
} }
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F) async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
where where
S: Store, S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy, for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{ {
let worker_id = uuid::Uuid::new_v4(); let worker_id = uuid::Uuid::new_v4();
let state = Rc::new(state);
loop { loop {
tracing::trace!("process_jobs: looping"); tracing::trace!("process_jobs: looping");
crate::sync::cooperate().await; crate::sync::cooperate().await;
let res = job_loop(&state, worker_id, queue, callback) // add a panic boundary by spawning a task
.with_poll_timer("job-loop") let res = crate::sync::spawn(
"job-loop",
job_loop(state.clone(), worker_id, queue, callback),
)
.await; .await;
if let Err(e) = res { match res {
// clean exit
Ok(Ok(())) => break,
// job error
Ok(Err(e)) => {
tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}")); tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() { if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
} }
continue;
} }
break; // job panic
Err(_) => {
tracing::warn!("Panic while processing jobs");
}
}
} }
} }
async fn job_loop<S, F>( async fn job_loop<S, F>(
state: &State<S>, state: Rc<State<S>>,
worker_id: uuid::Uuid, worker_id: uuid::Uuid,
queue: &'static str, queue: &'static str,
callback: F, callback: F,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S: Store, S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy, for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{ {
loop { loop {
tracing::trace!("job_loop: looping"); tracing::trace!("job_loop: looping");
@ -360,14 +374,18 @@ where
let guard = MetricsGuard::guard(worker_id, queue); let guard = MetricsGuard::guard(worker_id, queue);
let res = heartbeat( let state2 = state.clone();
let res = crate::sync::spawn("job-and-heartbeat", async move {
let state = state2;
heartbeat(
&state.repo, &state.repo,
queue, queue,
worker_id, worker_id,
job_id, job_id,
(callback)(state, job), (callback)(&state, job),
) )
.with_poll_timer("job-and-heartbeat") .await
})
.await; .await;
state state
@ -376,7 +394,7 @@ where
.with_poll_timer("job-complete") .with_poll_timer("job-complete")
.await?; .await?;
res?; res.map_err(|_| UploadError::Canceled)??;
guard.disarm(); guard.disarm();