diff --git a/src/queue.rs b/src/queue.rs index 5c87aa5..b139743 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -265,19 +265,10 @@ async fn process_jobs( { let worker_id = uuid::Uuid::new_v4(); - let yield_limit = 8; - let mut count = 0; - loop { tracing::trace!("process_jobs: looping"); - count += 1; - count %= yield_limit; - - // yield every 8 iterations to be kind to other tasks - if count == 0 { - tokio::task::yield_now().await; - } + tokio::task::yield_now().await; let res = job_loop(repo, store, config, worker_id, queue, callback).await; @@ -314,39 +305,24 @@ where ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { - let yield_limit = 8; - let mut count = 0; - loop { tracing::trace!("job_loop: looping"); - count += 1; - count %= yield_limit; + tokio::task::yield_now().await; - // yield every 8 iterations to be kind to other tasks - if count == 0 { - tokio::task::yield_now().await; - } - - let fut = async { + async { let (job_id, job) = repo.pop(queue, worker_id).await?; - let span = tracing::info_span!("Running Job"); - let guard = MetricsGuard::guard(worker_id, queue); - let res = span - .in_scope(|| { - heartbeat( - repo, - queue, - worker_id, - job_id, - (callback)(repo, store, config, job), - ) - }) - .instrument(span) - .await; + let res = heartbeat( + repo, + queue, + worker_id, + job_id, + (callback)(repo, store, config, job), + ) + .await; repo.complete_job(queue, worker_id, job_id).await?; @@ -355,10 +331,9 @@ where guard.disarm(); Ok(()) as Result<(), Error> - }; - - fut.instrument(tracing::info_span!("tick", worker_id = %worker_id)) - .await?; + } + .instrument(tracing::info_span!("tick", %queue, %worker_id)) + .await?; } } @@ -387,19 +362,10 @@ async fn process_image_jobs( { let worker_id = uuid::Uuid::new_v4(); - let yield_limit = 8; - let mut count = 0; - loop { tracing::trace!("process_image_jobs: looping"); - count += 1; - count %= yield_limit; - - // yield every 8 iterations to be kind to other tasks - if count == 0 { - tokio::task::yield_now().await; - } + tokio::task::yield_now().await; let res = image_job_loop( tmp_dir, @@ -454,53 +420,39 @@ where ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { - let yield_limit = 8; - let mut count = 0; - loop { tracing::trace!("image_job_loop: looping"); - count += 1; - count %= yield_limit; + tokio::task::yield_now().await; - // yield every 8 iterations to be kind to other tasks - if count == 0 { - tokio::task::yield_now().await; - } - - let fut = async { + async { let (job_id, job) = repo.pop(queue, worker_id).await?; - let span = tracing::info_span!("Running Job"); - let guard = MetricsGuard::guard(worker_id, queue); - let res = span - .in_scope(|| { - heartbeat( - repo, - queue, - worker_id, - job_id, - (callback)(tmp_dir, repo, store, client, process_map, config, job), - ) - }) - .instrument(span) - .await; + let res = heartbeat( + repo, + queue, + worker_id, + job_id, + (callback)(tmp_dir, repo, store, client, process_map, config, job), + ) + .await; repo.complete_job(queue, worker_id, job_id).await?; res?; guard.disarm(); - Ok(()) as Result<(), Error> - }; - fut.instrument(tracing::info_span!("tick", worker_id = %worker_id)) - .await?; + Ok(()) as Result<(), Error> + } + .instrument(tracing::info_span!("tick", %queue, %worker_id)) + .await?; } } +#[tracing::instrument("running-job", skip(repo, queue, worker_id, fut))] async fn heartbeat( repo: &ArcRepo, queue: &'static str, @@ -511,27 +463,19 @@ async fn heartbeat( where Fut: std::future::Future, { - let mut fut = - std::pin::pin!(fut.instrument(tracing::info_span!("job-future", job_id = ?job_id))); + let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future"))); let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut hb = None; - let yield_limit = 8; - let mut count = 0; - loop { tracing::trace!("heartbeat: looping"); - count += 1; - count %= yield_limit; - // yield every 8 iterations to be kind to other tasks - if count == 0 { - tokio::task::yield_now().await; - } + tokio::task::yield_now().await; tokio::select! { + biased; output = &mut fut => { return output; }