Yield more often in queue, simplify traces, log job_id in higher span

This commit is contained in:
asonix 2024-01-30 14:18:07 -06:00
parent dc12f6f79d
commit ee00fafee4
1 changed files with 33 additions and 89 deletions

View File

@ -265,19 +265,10 @@ async fn process_jobs<S, F>(
{ {
let worker_id = uuid::Uuid::new_v4(); let worker_id = uuid::Uuid::new_v4();
let yield_limit = 8;
let mut count = 0;
loop { loop {
tracing::trace!("process_jobs: looping"); tracing::trace!("process_jobs: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
}
let res = job_loop(repo, store, config, worker_id, queue, callback).await; let res = job_loop(repo, store, config, worker_id, queue, callback).await;
@ -314,38 +305,23 @@ where
) -> LocalBoxFuture<'a, Result<(), Error>> ) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy, + Copy,
{ {
let yield_limit = 8;
let mut count = 0;
loop { loop {
tracing::trace!("job_loop: looping"); tracing::trace!("job_loop: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
}
let fut = async { async {
let (job_id, job) = repo.pop(queue, worker_id).await?; let (job_id, job) = repo.pop(queue, worker_id).await?;
let span = tracing::info_span!("Running Job");
let guard = MetricsGuard::guard(worker_id, queue); let guard = MetricsGuard::guard(worker_id, queue);
let res = span let res = heartbeat(
.in_scope(|| {
heartbeat(
repo, repo,
queue, queue,
worker_id, worker_id,
job_id, job_id,
(callback)(repo, store, config, job), (callback)(repo, store, config, job),
) )
})
.instrument(span)
.await; .await;
repo.complete_job(queue, worker_id, job_id).await?; repo.complete_job(queue, worker_id, job_id).await?;
@ -355,9 +331,8 @@ where
guard.disarm(); guard.disarm();
Ok(()) as Result<(), Error> Ok(()) as Result<(), Error>
}; }
.instrument(tracing::info_span!("tick", %queue, %worker_id))
fut.instrument(tracing::info_span!("tick", worker_id = %worker_id))
.await?; .await?;
} }
} }
@ -387,19 +362,10 @@ async fn process_image_jobs<S, F>(
{ {
let worker_id = uuid::Uuid::new_v4(); let worker_id = uuid::Uuid::new_v4();
let yield_limit = 8;
let mut count = 0;
loop { loop {
tracing::trace!("process_image_jobs: looping"); tracing::trace!("process_image_jobs: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
}
let res = image_job_loop( let res = image_job_loop(
tmp_dir, tmp_dir,
@ -454,38 +420,23 @@ where
) -> LocalBoxFuture<'a, Result<(), Error>> ) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy, + Copy,
{ {
let yield_limit = 8;
let mut count = 0;
loop { loop {
tracing::trace!("image_job_loop: looping"); tracing::trace!("image_job_loop: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
}
let fut = async { async {
let (job_id, job) = repo.pop(queue, worker_id).await?; let (job_id, job) = repo.pop(queue, worker_id).await?;
let span = tracing::info_span!("Running Job");
let guard = MetricsGuard::guard(worker_id, queue); let guard = MetricsGuard::guard(worker_id, queue);
let res = span let res = heartbeat(
.in_scope(|| {
heartbeat(
repo, repo,
queue, queue,
worker_id, worker_id,
job_id, job_id,
(callback)(tmp_dir, repo, store, client, process_map, config, job), (callback)(tmp_dir, repo, store, client, process_map, config, job),
) )
})
.instrument(span)
.await; .await;
repo.complete_job(queue, worker_id, job_id).await?; repo.complete_job(queue, worker_id, job_id).await?;
@ -493,14 +444,15 @@ where
res?; res?;
guard.disarm(); guard.disarm();
Ok(()) as Result<(), Error>
};
fut.instrument(tracing::info_span!("tick", worker_id = %worker_id)) Ok(()) as Result<(), Error>
}
.instrument(tracing::info_span!("tick", %queue, %worker_id))
.await?; .await?;
} }
} }
#[tracing::instrument("running-job", skip(repo, queue, worker_id, fut))]
async fn heartbeat<Fut>( async fn heartbeat<Fut>(
repo: &ArcRepo, repo: &ArcRepo,
queue: &'static str, queue: &'static str,
@ -511,27 +463,19 @@ async fn heartbeat<Fut>(
where where
Fut: std::future::Future, Fut: std::future::Future,
{ {
let mut fut = let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future")));
std::pin::pin!(fut.instrument(tracing::info_span!("job-future", job_id = ?job_id)));
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut hb = None; let mut hb = None;
let yield_limit = 8;
let mut count = 0;
loop { loop {
tracing::trace!("heartbeat: looping"); tracing::trace!("heartbeat: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
}
tokio::select! { tokio::select! {
biased;
output = &mut fut => { output = &mut fut => {
return output; return output;
} }