2022-03-29 17:51:16 +00:00
|
|
|
use crate::{
|
2023-07-22 16:15:30 +00:00
|
|
|
concurrent_processor::ProcessMap,
|
2023-09-02 01:50:10 +00:00
|
|
|
error::{Error, UploadError},
|
2023-07-13 18:48:59 +00:00
|
|
|
formats::InputProcessableFormat,
|
2023-09-05 02:51:27 +00:00
|
|
|
future::LocalBoxFuture,
|
2023-10-07 00:42:24 +00:00
|
|
|
repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
|
2022-04-01 21:51:12 +00:00
|
|
|
serde_str::Serde,
|
2024-02-03 19:32:20 +00:00
|
|
|
state::State,
|
2023-09-02 23:30:45 +00:00
|
|
|
store::Store,
|
2022-03-29 17:51:16 +00:00
|
|
|
};
|
2024-02-04 00:19:14 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
use std::{
|
2024-03-09 18:15:23 +00:00
|
|
|
ops::Deref,
|
2023-08-13 19:12:38 +00:00
|
|
|
path::PathBuf,
|
2023-08-16 00:19:03 +00:00
|
|
|
sync::Arc,
|
2023-08-13 19:12:38 +00:00
|
|
|
time::{Duration, Instant},
|
|
|
|
};
|
2022-04-02 23:53:03 +00:00
|
|
|
use tracing::Instrument;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
2023-10-04 17:11:29 +00:00
|
|
|
pub(crate) mod cleanup;
|
2022-04-01 21:51:12 +00:00
|
|
|
mod process;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
|
|
|
const CLEANUP_QUEUE: &str = "cleanup";
|
2022-04-01 21:51:12 +00:00
|
|
|
const PROCESS_QUEUE: &str = "process";
|
2024-01-24 23:14:31 +00:00
|
|
|
const OUTDATED_PROXIES_UNIQUE_KEY: &str = "outdated-proxies";
|
|
|
|
const OUTDATED_VARIANTS_UNIQUE_KEY: &str = "outdated-variants";
|
|
|
|
const ALL_VARIANTS_UNIQUE_KEY: &str = "all-variants";
|
|
|
|
const PRUNE_MISSING_UNIQUE_KEY: &str = "prune-missing";
|
2022-03-29 17:51:16 +00:00
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
2022-04-01 21:51:12 +00:00
|
|
|
enum Cleanup {
|
2022-04-02 21:44:03 +00:00
|
|
|
Hash {
|
2023-08-14 19:25:19 +00:00
|
|
|
hash: Hash,
|
2022-04-02 21:44:03 +00:00
|
|
|
},
|
|
|
|
Identifier {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: String,
|
2022-04-02 21:44:03 +00:00
|
|
|
},
|
|
|
|
Alias {
|
|
|
|
alias: Serde<Alias>,
|
|
|
|
token: Serde<DeleteToken>,
|
|
|
|
},
|
2022-04-11 21:56:39 +00:00
|
|
|
Variant {
|
2023-08-14 19:25:19 +00:00
|
|
|
hash: Hash,
|
2023-07-23 00:41:50 +00:00
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
|
|
variant: Option<String>,
|
2022-04-11 21:56:39 +00:00
|
|
|
},
|
|
|
|
AllVariants,
|
2023-07-23 00:41:50 +00:00
|
|
|
OutdatedVariants,
|
2023-07-23 20:45:52 +00:00
|
|
|
OutdatedProxies,
|
2023-12-12 22:54:41 +00:00
|
|
|
Prune,
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
|
|
enum Process {
|
|
|
|
Ingest {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: String,
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: Serde<UploadId>,
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: Option<Serde<Alias>>,
|
|
|
|
},
|
|
|
|
Generate {
|
2023-07-13 18:48:59 +00:00
|
|
|
target_format: InputProcessableFormat,
|
2022-04-01 21:51:12 +00:00
|
|
|
source: Serde<Alias>,
|
|
|
|
process_path: PathBuf,
|
|
|
|
process_args: Vec<String>,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
pub(crate) async fn cleanup_alias(
|
2023-10-07 00:42:24 +00:00
|
|
|
repo: &ArcRepo,
|
2022-04-02 21:44:03 +00:00
|
|
|
alias: Alias,
|
|
|
|
token: DeleteToken,
|
|
|
|
) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::Alias {
|
2022-04-02 21:44:03 +00:00
|
|
|
alias: Serde::new(alias),
|
|
|
|
token: Serde::new(token),
|
2023-09-02 01:50:10 +00:00
|
|
|
})
|
|
|
|
.map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(CLEANUP_QUEUE, job, None).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-10-07 00:42:24 +00:00
|
|
|
pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(CLEANUP_QUEUE, job, None).await?;
|
2022-03-29 17:51:16 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-10-07 00:42:24 +00:00
|
|
|
pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::Identifier {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: identifier.to_string(),
|
2023-09-02 01:50:10 +00:00
|
|
|
})
|
|
|
|
.map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(CLEANUP_QUEUE, job, None).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn cleanup_variants(
|
2023-10-07 00:42:24 +00:00
|
|
|
repo: &ArcRepo,
|
2023-08-14 19:25:19 +00:00
|
|
|
hash: Hash,
|
2023-07-23 00:41:50 +00:00
|
|
|
variant: Option<String>,
|
|
|
|
) -> Result<(), Error> {
|
2023-09-02 01:50:10 +00:00
|
|
|
let job =
|
2023-09-03 21:59:41 +00:00
|
|
|
serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(CLEANUP_QUEUE, job, None).await?;
|
2022-04-11 21:56:39 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-10-07 00:42:24 +00:00
|
|
|
pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
if repo
|
|
|
|
.push(CLEANUP_QUEUE, job, Some(OUTDATED_PROXIES_UNIQUE_KEY))
|
|
|
|
.await?
|
|
|
|
.is_none()
|
|
|
|
{
|
|
|
|
tracing::debug!("outdated proxies conflict");
|
|
|
|
}
|
2023-07-23 20:45:52 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-10-07 00:42:24 +00:00
|
|
|
pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
if repo
|
|
|
|
.push(CLEANUP_QUEUE, job, Some(OUTDATED_VARIANTS_UNIQUE_KEY))
|
|
|
|
.await?
|
|
|
|
.is_none()
|
|
|
|
{
|
|
|
|
tracing::debug!("outdated variants conflict");
|
|
|
|
}
|
2023-07-23 00:41:50 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-10-07 00:42:24 +00:00
|
|
|
pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
if repo
|
|
|
|
.push(CLEANUP_QUEUE, job, Some(ALL_VARIANTS_UNIQUE_KEY))
|
|
|
|
.await?
|
|
|
|
.is_none()
|
|
|
|
{
|
|
|
|
tracing::debug!("all variants conflict");
|
|
|
|
}
|
2022-04-11 21:56:39 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-12-12 22:54:41 +00:00
|
|
|
pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
if repo
|
|
|
|
.push(CLEANUP_QUEUE, job, Some(PRUNE_MISSING_UNIQUE_KEY))
|
|
|
|
.await?
|
|
|
|
.is_none()
|
|
|
|
{
|
|
|
|
tracing::debug!("prune missing conflict");
|
|
|
|
}
|
2023-12-12 22:54:41 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
pub(crate) async fn queue_ingest(
|
2023-10-07 00:42:24 +00:00
|
|
|
repo: &ArcRepo,
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: &Arc<str>,
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: UploadId,
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: Option<Alias>,
|
|
|
|
) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Process::Ingest {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: identifier.to_string(),
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: declared_alias.map(Serde::new),
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: Serde::new(upload_id),
|
2023-09-02 01:50:10 +00:00
|
|
|
})
|
|
|
|
.map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(PROCESS_QUEUE, job, None).await?;
|
2022-04-01 21:51:12 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
pub(crate) async fn queue_generate(
|
2023-10-07 00:42:24 +00:00
|
|
|
repo: &ArcRepo,
|
2023-07-13 18:48:59 +00:00
|
|
|
target_format: InputProcessableFormat,
|
2022-04-01 21:51:12 +00:00
|
|
|
source: Alias,
|
|
|
|
process_path: PathBuf,
|
|
|
|
process_args: Vec<String>,
|
|
|
|
) -> Result<(), Error> {
|
2023-09-03 21:59:41 +00:00
|
|
|
let job = serde_json::to_value(Process::Generate {
|
2022-04-01 21:51:12 +00:00
|
|
|
target_format,
|
|
|
|
source: Serde::new(source),
|
|
|
|
process_path,
|
|
|
|
process_args,
|
2023-09-02 01:50:10 +00:00
|
|
|
})
|
|
|
|
.map_err(UploadError::PushJob)?;
|
2024-01-24 23:14:31 +00:00
|
|
|
repo.push(PROCESS_QUEUE, job, None).await?;
|
2022-04-01 21:51:12 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2024-02-03 19:31:54 +00:00
|
|
|
pub(crate) async fn process_cleanup<S: Store + 'static>(state: State<S>) {
|
|
|
|
process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2024-02-03 19:31:54 +00:00
|
|
|
pub(crate) async fn process_images<S: Store + 'static>(state: State<S>, process_map: ProcessMap) {
|
|
|
|
process_image_jobs(state, process_map, PROCESS_QUEUE, process::perform).await
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2024-01-25 22:59:46 +00:00
|
|
|
struct MetricsGuard {
|
|
|
|
worker_id: uuid::Uuid,
|
|
|
|
queue: &'static str,
|
|
|
|
start: Instant,
|
|
|
|
armed: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MetricsGuard {
|
|
|
|
fn guard(worker_id: uuid::Uuid, queue: &'static str) -> Self {
|
2024-02-04 21:45:47 +00:00
|
|
|
metrics::counter!(crate::init_metrics::JOB_START, "queue" => queue, "worker-id" => worker_id.to_string()).increment(1);
|
2024-01-25 22:59:46 +00:00
|
|
|
|
|
|
|
Self {
|
|
|
|
worker_id,
|
|
|
|
queue,
|
|
|
|
start: Instant::now(),
|
|
|
|
armed: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn disarm(mut self) {
|
|
|
|
self.armed = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for MetricsGuard {
|
|
|
|
fn drop(&mut self) {
|
2024-02-04 21:45:47 +00:00
|
|
|
metrics::histogram!(crate::init_metrics::JOB_DURAION, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
|
|
|
|
metrics::counter!(crate::init_metrics::JOB_END, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).increment(1);
|
2024-01-25 22:59:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-09 18:15:23 +00:00
|
|
|
pub(super) enum JobError {
|
|
|
|
Abort(Error),
|
|
|
|
Retry(Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AsRef<Error> for JobError {
|
|
|
|
fn as_ref(&self) -> &Error {
|
|
|
|
match self {
|
|
|
|
Self::Abort(e) | Self::Retry(e) => e,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Deref for JobError {
|
|
|
|
type Target = Error;
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
|
|
match self {
|
|
|
|
Self::Abort(e) | Self::Retry(e) => e,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<JobError> for Error {
|
|
|
|
fn from(value: JobError) -> Self {
|
|
|
|
match value {
|
|
|
|
JobError::Abort(e) | JobError::Retry(e) => e,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type JobResult<T = ()> = Result<T, JobError>;
|
|
|
|
|
|
|
|
type JobFuture<'a> = LocalBoxFuture<'a, JobResult>;
|
|
|
|
|
|
|
|
trait JobContext {
|
|
|
|
type Item;
|
|
|
|
|
|
|
|
fn abort(self) -> JobResult<Self::Item>
|
|
|
|
where
|
|
|
|
Self: Sized;
|
|
|
|
|
|
|
|
fn retry(self) -> JobResult<Self::Item>
|
|
|
|
where
|
|
|
|
Self: Sized;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T, E> JobContext for Result<T, E>
|
|
|
|
where
|
|
|
|
E: Into<Error>,
|
|
|
|
{
|
|
|
|
type Item = T;
|
|
|
|
|
|
|
|
fn abort(self) -> JobResult<Self::Item>
|
|
|
|
where
|
|
|
|
Self: Sized,
|
|
|
|
{
|
|
|
|
self.map_err(Into::into).map_err(JobError::Abort)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retry(self) -> JobResult<Self::Item>
|
|
|
|
where
|
|
|
|
Self: Sized,
|
|
|
|
{
|
|
|
|
self.map_err(Into::into).map_err(JobError::Retry)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn job_result(result: &JobResult) -> crate::repo::JobResult {
|
|
|
|
match result {
|
|
|
|
Ok(()) => crate::repo::JobResult::Success,
|
|
|
|
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
|
|
|
|
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-03 19:31:54 +00:00
|
|
|
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
|
|
|
where
|
2022-03-29 17:51:16 +00:00
|
|
|
S: Store,
|
2024-03-09 18:15:23 +00:00
|
|
|
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
2022-03-29 17:51:16 +00:00
|
|
|
{
|
2023-08-14 00:47:20 +00:00
|
|
|
let worker_id = uuid::Uuid::new_v4();
|
|
|
|
|
2022-03-29 17:51:16 +00:00
|
|
|
loop {
|
2023-12-28 17:58:38 +00:00
|
|
|
tracing::trace!("process_jobs: looping");
|
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
tokio::task::yield_now().await;
|
2024-01-25 22:59:46 +00:00
|
|
|
|
2024-02-03 19:31:54 +00:00
|
|
|
let res = job_loop(&state, worker_id, queue, callback).await;
|
2022-04-01 21:51:12 +00:00
|
|
|
|
|
|
|
if let Err(e) = res {
|
2023-01-29 17:57:59 +00:00
|
|
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
|
|
|
tracing::warn!("{}", format!("{e:?}"));
|
2023-09-05 02:51:27 +00:00
|
|
|
|
|
|
|
if e.is_disconnected() {
|
2023-10-21 00:08:11 +00:00
|
|
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
2023-09-05 02:51:27 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
continue;
|
|
|
|
}
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
break;
|
2022-03-29 18:18:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn job_loop<S, F>(
|
2024-02-03 19:31:54 +00:00
|
|
|
state: &State<S>,
|
2023-08-14 00:47:20 +00:00
|
|
|
worker_id: uuid::Uuid,
|
2022-04-02 23:53:03 +00:00
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) -> Result<(), Error>
|
2022-03-29 18:18:47 +00:00
|
|
|
where
|
|
|
|
S: Store,
|
2024-03-09 18:15:23 +00:00
|
|
|
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
2022-03-29 18:18:47 +00:00
|
|
|
{
|
2022-04-01 21:51:12 +00:00
|
|
|
loop {
|
2023-12-28 17:58:38 +00:00
|
|
|
tracing::trace!("job_loop: looping");
|
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
tokio::task::yield_now().await;
|
2024-01-25 22:59:46 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
async {
|
2024-02-03 19:31:54 +00:00
|
|
|
let (job_id, job) = state.repo.pop(queue, worker_id).await?;
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
let guard = MetricsGuard::guard(worker_id, queue);
|
2023-07-23 02:11:28 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
let res = heartbeat(
|
2024-02-03 19:31:54 +00:00
|
|
|
&state.repo,
|
2024-01-30 20:18:07 +00:00
|
|
|
queue,
|
|
|
|
worker_id,
|
|
|
|
job_id,
|
2024-02-03 19:31:54 +00:00
|
|
|
(callback)(state, job),
|
2024-01-30 20:18:07 +00:00
|
|
|
)
|
|
|
|
.await;
|
2023-08-14 00:47:20 +00:00
|
|
|
|
2024-03-09 18:15:23 +00:00
|
|
|
state
|
|
|
|
.repo
|
|
|
|
.complete_job(queue, worker_id, job_id, job_result(&res))
|
|
|
|
.await?;
|
2023-08-14 00:47:20 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
res?;
|
2023-07-23 02:11:28 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
guard.disarm();
|
|
|
|
|
|
|
|
Ok(()) as Result<(), Error>
|
2024-01-30 20:18:07 +00:00
|
|
|
}
|
|
|
|
.instrument(tracing::info_span!("tick", %queue, %worker_id))
|
|
|
|
.await?;
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
2023-07-22 16:15:30 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn process_image_jobs<S, F>(
|
2024-02-03 19:31:54 +00:00
|
|
|
state: State<S>,
|
|
|
|
process_map: ProcessMap,
|
2023-07-22 16:15:30 +00:00
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) where
|
|
|
|
S: Store,
|
2024-03-09 18:15:23 +00:00
|
|
|
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
|
2023-07-22 16:15:30 +00:00
|
|
|
{
|
2023-08-14 00:47:20 +00:00
|
|
|
let worker_id = uuid::Uuid::new_v4();
|
|
|
|
|
2023-07-22 16:15:30 +00:00
|
|
|
loop {
|
2023-12-28 17:58:38 +00:00
|
|
|
tracing::trace!("process_image_jobs: looping");
|
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
tokio::task::yield_now().await;
|
2024-01-25 22:59:46 +00:00
|
|
|
|
2024-02-03 19:31:54 +00:00
|
|
|
let res = image_job_loop(&state, &process_map, worker_id, queue, callback).await;
|
2023-07-22 16:15:30 +00:00
|
|
|
|
|
|
|
if let Err(e) = res {
|
|
|
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
|
|
|
tracing::warn!("{}", format!("{e:?}"));
|
2023-09-05 02:51:27 +00:00
|
|
|
|
|
|
|
if e.is_disconnected() {
|
2024-01-06 01:35:52 +00:00
|
|
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
2023-09-05 02:51:27 +00:00
|
|
|
}
|
|
|
|
|
2023-07-22 16:15:30 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn image_job_loop<S, F>(
|
2024-02-03 19:31:54 +00:00
|
|
|
state: &State<S>,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map: &ProcessMap,
|
2023-08-14 00:47:20 +00:00
|
|
|
worker_id: uuid::Uuid,
|
2023-07-22 16:15:30 +00:00
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) -> Result<(), Error>
|
|
|
|
where
|
|
|
|
S: Store,
|
2024-03-09 18:15:23 +00:00
|
|
|
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
|
2023-07-22 16:15:30 +00:00
|
|
|
{
|
|
|
|
loop {
|
2023-12-28 17:58:38 +00:00
|
|
|
tracing::trace!("image_job_loop: looping");
|
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
tokio::task::yield_now().await;
|
2024-01-25 22:59:46 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
async {
|
2024-02-03 19:31:54 +00:00
|
|
|
let (job_id, job) = state.repo.pop(queue, worker_id).await?;
|
2023-07-22 16:15:30 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
let guard = MetricsGuard::guard(worker_id, queue);
|
2023-08-14 00:47:20 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
let res = heartbeat(
|
2024-02-03 19:31:54 +00:00
|
|
|
&state.repo,
|
2024-01-30 20:18:07 +00:00
|
|
|
queue,
|
|
|
|
worker_id,
|
|
|
|
job_id,
|
2024-02-03 19:31:54 +00:00
|
|
|
(callback)(state, process_map, job),
|
2024-01-30 20:18:07 +00:00
|
|
|
)
|
|
|
|
.await;
|
2023-08-14 00:47:20 +00:00
|
|
|
|
2024-03-09 18:15:23 +00:00
|
|
|
state
|
|
|
|
.repo
|
|
|
|
.complete_job(queue, worker_id, job_id, job_result(&res))
|
|
|
|
.await?;
|
2023-07-23 02:11:28 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
res?;
|
2023-07-23 02:11:28 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
guard.disarm();
|
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
Ok(()) as Result<(), Error>
|
|
|
|
}
|
|
|
|
.instrument(tracing::info_span!("tick", %queue, %worker_id))
|
|
|
|
.await?;
|
2023-07-22 16:15:30 +00:00
|
|
|
}
|
|
|
|
}
|
2023-08-13 19:12:38 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
#[tracing::instrument("running-job", skip(repo, queue, worker_id, fut))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn heartbeat<Fut>(
|
2023-10-07 00:42:24 +00:00
|
|
|
repo: &ArcRepo,
|
2023-08-15 02:17:57 +00:00
|
|
|
queue: &'static str,
|
|
|
|
worker_id: uuid::Uuid,
|
|
|
|
job_id: JobId,
|
|
|
|
fut: Fut,
|
|
|
|
) -> Fut::Output
|
2023-08-13 19:12:38 +00:00
|
|
|
where
|
|
|
|
Fut: std::future::Future,
|
|
|
|
{
|
2024-01-30 20:18:07 +00:00
|
|
|
let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future")));
|
2023-08-13 19:12:38 +00:00
|
|
|
|
2023-10-21 00:08:11 +00:00
|
|
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
2023-08-13 19:12:38 +00:00
|
|
|
|
|
|
|
let mut hb = None;
|
|
|
|
|
|
|
|
loop {
|
2023-12-28 17:58:38 +00:00
|
|
|
tracing::trace!("heartbeat: looping");
|
2024-01-06 23:47:44 +00:00
|
|
|
|
2024-01-30 20:18:07 +00:00
|
|
|
tokio::task::yield_now().await;
|
2023-12-28 17:58:38 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
tokio::select! {
|
2024-01-30 20:18:07 +00:00
|
|
|
biased;
|
2023-08-13 19:12:38 +00:00
|
|
|
output = &mut fut => {
|
|
|
|
return output;
|
|
|
|
}
|
|
|
|
_ = interval.tick() => {
|
|
|
|
if hb.is_none() {
|
2023-08-15 02:17:57 +00:00
|
|
|
hb = Some(repo.heartbeat(queue, worker_id, job_id));
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
opt = poll_opt(hb.as_mut()), if hb.is_some() => {
|
2023-08-15 02:17:57 +00:00
|
|
|
hb.take();
|
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
if let Some(Err(e)) = opt {
|
|
|
|
tracing::warn!("Failed heartbeat\n{}", format!("{e:?}"));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn poll_opt<Fut>(opt: Option<&mut Fut>) -> Option<Fut::Output>
|
|
|
|
where
|
|
|
|
Fut: std::future::Future + Unpin,
|
|
|
|
{
|
|
|
|
match opt {
|
|
|
|
None => None,
|
2023-08-15 02:17:57 +00:00
|
|
|
Some(fut) => Some(fut.await),
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
|
|
|
}
|