Implement retries for jobs, start warning on long polls

This commit is contained in:
asonix 2024-03-09 12:15:23 -06:00
parent 40bb58d603
commit 9fe586b9dd
16 changed files with 558 additions and 229 deletions

View File

@ -108,6 +108,9 @@ pub(crate) enum UploadError {
#[error("Error in request response")] #[error("Error in request response")]
Request(#[from] reqwest::Error), Request(#[from] reqwest::Error),
#[error("Invalid job popped from job queue: {1}")]
InvalidJob(#[source] serde_json::Error, String),
#[error("pict-rs is in read-only mode")] #[error("pict-rs is in read-only mode")]
ReadOnly, ReadOnly,
@ -201,6 +204,7 @@ impl UploadError {
Self::Timeout(_) | Self::AggregateTimeout => ErrorCode::STREAM_TOO_SLOW, Self::Timeout(_) | Self::AggregateTimeout => ErrorCode::STREAM_TOO_SLOW,
Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT, Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT,
Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION,
Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB,
} }
} }

View File

@ -144,4 +144,7 @@ impl ErrorCode {
pub(crate) const FAILED_EXTERNAL_VALIDATION: ErrorCode = ErrorCode { pub(crate) const FAILED_EXTERNAL_VALIDATION: ErrorCode = ErrorCode {
code: "failed-external-validation", code: "failed-external-validation",
}; };
pub(crate) const INVALID_JOB: ErrorCode = ErrorCode {
code: "invalid-job",
};
} }

View File

@ -59,9 +59,19 @@ pub(crate) trait WithMetrics: Future {
} }
} }
pub(crate) trait WithPollTimer: Future {
fn with_poll_timer(self, name: &'static str) -> PollTimer<Self>
where
Self: Sized,
{
PollTimer { name, inner: self }
}
}
impl<F> NowOrNever for F where F: Future {} impl<F> NowOrNever for F where F: Future {}
impl<F> WithMetrics for F where F: Future {} impl<F> WithMetrics for F where F: Future {}
impl<F> WithTimeout for F where F: Future {} impl<F> WithTimeout for F where F: Future {}
impl<F> WithPollTimer for F where F: Future {}
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
pub(crate) struct MetricsFuture<F> { pub(crate) struct MetricsFuture<F> {
@ -104,3 +114,59 @@ impl Drop for Metrics {
.record(self.start.elapsed().as_secs_f64()); .record(self.start.elapsed().as_secs_f64());
} }
} }
pin_project_lite::pin_project! {
pub(crate) struct PollTimer<F> {
name: &'static str,
#[pin]
inner: F,
}
}
impl<F> Future for PollTimer<F>
where
F: Future,
{
type Output = F::Output;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let start = Instant::now();
let this = self.project();
let out = this.inner.poll(cx);
let elapsed = start.elapsed();
if elapsed > Duration::from_micros(10) {
metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string());
}
if elapsed > Duration::from_secs(1) {
tracing::warn!(
"Future {} polled for {} seconds",
this.name,
elapsed.as_secs()
);
} else if elapsed > Duration::from_millis(1) {
tracing::warn!("Future {} polled for {} ms", this.name, elapsed.as_millis());
} else if elapsed > Duration::from_micros(200) {
tracing::debug!(
"Future {} polled for {} microseconds",
this.name,
elapsed.as_micros(),
);
} else if elapsed > Duration::from_micros(1) {
tracing::trace!(
"Future {} polled for {} microseconds",
this.name,
elapsed.as_micros()
);
}
out
}
}

View File

@ -6,7 +6,7 @@ use crate::{
details::Details, details::Details,
error::{Error, UploadError}, error::{Error, UploadError},
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
future::{WithMetrics, WithTimeout}, future::{WithMetrics, WithPollTimer, WithTimeout},
repo::{Hash, VariantAlreadyExists}, repo::{Hash, VariantAlreadyExists},
state::State, state::State,
store::Store, store::Store,
@ -48,7 +48,7 @@ impl Drop for MetricsGuard {
} }
} }
#[tracing::instrument(skip(state, process_map, hash))] #[tracing::instrument(skip(state, process_map, original_details, hash))]
pub(crate) async fn generate<S: Store + 'static>( pub(crate) async fn generate<S: Store + 'static>(
state: &State<S>, state: &State<S>,
process_map: &ProcessMap, process_map: &ProcessMap,
@ -78,6 +78,7 @@ pub(crate) async fn generate<S: Store + 'static>(
let (details, identifier) = process_map let (details, identifier) = process_map
.process(hash, thumbnail_path, process_fut) .process(hash, thumbnail_path, process_fut)
.with_poll_timer("process-future")
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
.with_metrics(crate::init_metrics::GENERATE_PROCESS) .with_metrics(crate::init_metrics::GENERATE_PROCESS)
.await .await

View File

@ -5,7 +5,7 @@ use crate::{
details::Details, details::Details,
error::{Error, UploadError}, error::{Error, UploadError},
formats::InternalFormat, formats::InternalFormat,
future::WithMetrics, future::{WithMetrics, WithPollTimer},
repo::{Alias, ArcRepo, DeleteToken, Hash}, repo::{Alias, ArcRepo, DeleteToken, Hash},
state::State, state::State,
store::Store, store::Store,
@ -159,7 +159,9 @@ where
let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode { let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode {
dummy_ingest(state, stream).await? dummy_ingest(state, stream).await?
} else { } else {
process_ingest(state, stream).await? process_ingest(state, stream)
.with_poll_timer("ingest-future")
.await?
}; };
let mut session = Session { let mut session = Session {

View File

@ -1,5 +1,6 @@
pub(super) fn init_metrics() { pub(super) fn init_metrics() {
describe_toplevel(); describe_toplevel();
describe_future();
describe_queue_cleanup(); describe_queue_cleanup();
describe_payload(); describe_payload();
describe_job(); describe_job();
@ -26,6 +27,15 @@ fn describe_toplevel() {
pub(crate) const FILES: &str = "pict-rs.files"; pub(crate) const FILES: &str = "pict-rs.files";
pub(crate) const BACKGROUND_UPLOAD_CLAIM: &str = "pict-rs.background.upload.claim"; pub(crate) const BACKGROUND_UPLOAD_CLAIM: &str = "pict-rs.background.upload.claim";
fn describe_future() {
metrics::describe_counter!(
FUTURE_POLL_TIMER_EXCEEDED,
"How many times a given poll operation has lasted longer than 10 microseconds"
);
}
pub(crate) const FUTURE_POLL_TIMER_EXCEEDED: &str = "pict-rs.future.poll-timer.exceeded";
fn describe_queue_cleanup() { fn describe_queue_cleanup() {
metrics::describe_counter!( metrics::describe_counter!(
CLEANUP_OUTDATED_PROXY, CLEANUP_OUTDATED_PROXY,
@ -344,6 +354,14 @@ fn describe_postgres() {
POSTGRES_QUEUE_HEARTBEAT, POSTGRES_QUEUE_HEARTBEAT,
"Timings for updating the provided job's keepalive heartbeat" "Timings for updating the provided job's keepalive heartbeat"
); );
metrics::describe_histogram!(
POSTGRES_QUEUE_RETRY,
"Timings for updating retry count for a job"
);
metrics::describe_histogram!(
POSTGRES_QUEUE_CLEANUP,
"Timings for removing jobs with no more retries"
);
metrics::describe_histogram!( metrics::describe_histogram!(
POSTGRES_QUEUE_COMPLETE, POSTGRES_QUEUE_COMPLETE,
"Timings for removing a completed job from the queue" "Timings for removing a completed job from the queue"
@ -471,6 +489,8 @@ pub(crate) const POSTGRES_QUEUE_LISTEN: &str = "pict-rs.postgres.queue.listen";
pub(crate) const POSTGRES_QUEUE_REQUEUE: &str = "pict-rs.postgres.queue.requeue"; pub(crate) const POSTGRES_QUEUE_REQUEUE: &str = "pict-rs.postgres.queue.requeue";
pub(crate) const POSTGRES_QUEUE_CLAIM: &str = "pict-rs.postgres.queue.claim"; pub(crate) const POSTGRES_QUEUE_CLAIM: &str = "pict-rs.postgres.queue.claim";
pub(crate) const POSTGRES_QUEUE_HEARTBEAT: &str = "pict-rs.postgres.queue.heartbeat"; pub(crate) const POSTGRES_QUEUE_HEARTBEAT: &str = "pict-rs.postgres.queue.heartbeat";
pub(crate) const POSTGRES_QUEUE_RETRY: &str = "pict-rs.postgres.queue.retry";
pub(crate) const POSTGRES_QUEUE_CLEANUP: &str = "pict-rs.postgres.queue.cleanup";
pub(crate) const POSTGRES_QUEUE_COMPLETE: &str = "pict-rs.postgres.queue.complete"; pub(crate) const POSTGRES_QUEUE_COMPLETE: &str = "pict-rs.postgres.queue.complete";
pub(crate) const POSTGRES_STORE_MIGRATION_COUNT: &str = "pict-rs.postgres.store-migration.count"; pub(crate) const POSTGRES_STORE_MIGRATION_COUNT: &str = "pict-rs.postgres.store-migration.count";
pub(crate) const POSTGRES_STORE_MIGRATION_MARK_MIGRATED: &str = pub(crate) const POSTGRES_STORE_MIGRATION_MARK_MIGRATED: &str =

View File

@ -44,7 +44,7 @@ use actix_web::{
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
}; };
use details::{ApiDetails, HumanDate}; use details::{ApiDetails, HumanDate};
use future::WithTimeout; use future::{WithPollTimer, WithTimeout};
use futures_core::Stream; use futures_core::Stream;
use magick::ArcPolicyDir; use magick::ArcPolicyDir;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
@ -186,6 +186,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
ingest::ingest(&state, stream, None).await ingest::ingest(&state, stream, None).await
} }
.with_poll_timer("file-upload")
.instrument(span), .instrument(span),
) )
})), })),
@ -237,6 +238,7 @@ impl<S: Store + 'static> FormData for Import<S> {
ingest::ingest(&state, stream, Some(Alias::from_existing(&filename))) ingest::ingest(&state, stream, Some(Alias::from_existing(&filename)))
.await .await
} }
.with_poll_timer("file-import")
.instrument(span), .instrument(span),
) )
})), })),
@ -351,6 +353,7 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
Backgrounded::proxy(&state, stream).await Backgrounded::proxy(&state, stream).await
} }
.with_poll_timer("file-proxy")
.instrument(span), .instrument(span),
) )
})), })),

View File

@ -10,6 +10,7 @@ use crate::{
}; };
use std::{ use std::{
ops::Deref,
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
@ -225,10 +226,86 @@ impl Drop for MetricsGuard {
} }
} }
pub(super) enum JobError {
Abort(Error),
Retry(Error),
}
impl AsRef<Error> for JobError {
fn as_ref(&self) -> &Error {
match self {
Self::Abort(e) | Self::Retry(e) => e,
}
}
}
impl Deref for JobError {
type Target = Error;
fn deref(&self) -> &Self::Target {
match self {
Self::Abort(e) | Self::Retry(e) => e,
}
}
}
impl From<JobError> for Error {
fn from(value: JobError) -> Self {
match value {
JobError::Abort(e) | JobError::Retry(e) => e,
}
}
}
type JobResult<T = ()> = Result<T, JobError>;
type JobFuture<'a> = LocalBoxFuture<'a, JobResult>;
trait JobContext {
type Item;
fn abort(self) -> JobResult<Self::Item>
where
Self: Sized;
fn retry(self) -> JobResult<Self::Item>
where
Self: Sized;
}
impl<T, E> JobContext for Result<T, E>
where
E: Into<Error>,
{
type Item = T;
fn abort(self) -> JobResult<Self::Item>
where
Self: Sized,
{
self.map_err(Into::into).map_err(JobError::Abort)
}
fn retry(self) -> JobResult<Self::Item>
where
Self: Sized,
{
self.map_err(Into::into).map_err(JobError::Retry)
}
}
fn job_result(result: &JobResult) -> crate::repo::JobResult {
match result {
Ok(()) => crate::repo::JobResult::Success,
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
}
}
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F) async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
where where
S: Store, S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
{ {
let worker_id = uuid::Uuid::new_v4(); let worker_id = uuid::Uuid::new_v4();
@ -262,7 +339,7 @@ async fn job_loop<S, F>(
) -> Result<(), Error> ) -> Result<(), Error>
where where
S: Store, S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
{ {
loop { loop {
tracing::trace!("job_loop: looping"); tracing::trace!("job_loop: looping");
@ -283,7 +360,10 @@ where
) )
.await; .await;
state.repo.complete_job(queue, worker_id, job_id).await?; state
.repo
.complete_job(queue, worker_id, job_id, job_result(&res))
.await?;
res?; res?;
@ -303,8 +383,7 @@ async fn process_image_jobs<S, F>(
callback: F, callback: F,
) where ) where
S: Store, S: Store,
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
+ Copy,
{ {
let worker_id = uuid::Uuid::new_v4(); let worker_id = uuid::Uuid::new_v4();
@ -339,8 +418,7 @@ async fn image_job_loop<S, F>(
) -> Result<(), Error> ) -> Result<(), Error>
where where
S: Store, S: Store,
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
+ Copy,
{ {
loop { loop {
tracing::trace!("image_job_loop: looping"); tracing::trace!("image_job_loop: looping");
@ -361,7 +439,10 @@ where
) )
.await; .await;
state.repo.complete_job(queue, worker_id, job_id).await?; state
.repo
.complete_job(queue, worker_id, job_id, job_result(&res))
.await?;
res?; res?;

View File

@ -6,7 +6,7 @@ use tracing::{Instrument, Span};
use crate::{ use crate::{
config::Configuration, config::Configuration,
error::{Error, UploadError}, error::{Error, UploadError},
future::LocalBoxFuture, future::WithPollTimer,
queue::Cleanup, queue::Cleanup,
repo::{Alias, ArcRepo, DeleteToken, Hash}, repo::{Alias, ArcRepo, DeleteToken, Hash},
serde_str::Serde, serde_str::Serde,
@ -14,41 +14,67 @@ use crate::{
store::Store, store::Store,
}; };
pub(super) fn perform<S>( use super::{JobContext, JobFuture, JobResult};
state: &State<S>,
job: serde_json::Value, pub(super) fn perform<S>(state: &State<S>, job: serde_json::Value) -> JobFuture<'_>
) -> LocalBoxFuture<'_, Result<(), Error>>
where where
S: Store + 'static, S: Store + 'static,
{ {
Box::pin(async move { Box::pin(async move {
match serde_json::from_value(job) { let job_text = format!("{job}");
Ok(job) => match job {
Cleanup::Hash { hash: in_hash } => hash(&state.repo, in_hash).await?, let job = serde_json::from_value(job)
Cleanup::Identifier { .map_err(|e| UploadError::InvalidJob(e, job_text))
identifier: in_identifier, .abort()?;
} => identifier(&state.repo, &state.store, Arc::from(in_identifier)).await?,
Cleanup::Alias { match job {
alias: stored_alias, Cleanup::Hash { hash: in_hash } => {
token, hash(&state.repo, in_hash)
} => { .with_poll_timer("cleanup-hash")
alias( .await?
&state.repo, }
Serde::into_inner(stored_alias), Cleanup::Identifier {
Serde::into_inner(token), identifier: in_identifier,
) } => {
identifier(&state.repo, &state.store, Arc::from(in_identifier))
.with_poll_timer("cleanup-identifier")
.await?
}
Cleanup::Alias {
alias: stored_alias,
token,
} => {
alias(
&state.repo,
Serde::into_inner(stored_alias),
Serde::into_inner(token),
)
.await?
}
Cleanup::Variant { hash, variant } => {
hash_variant(&state.repo, hash, variant)
.with_poll_timer("cleanup-hash-variant")
.await?
}
Cleanup::AllVariants => {
all_variants(&state.repo)
.with_poll_timer("cleanup-all-variants")
.await?
}
Cleanup::OutdatedVariants => {
outdated_variants(&state.repo, &state.config)
.with_poll_timer("cleanup-outdated-variants")
.await?
}
Cleanup::OutdatedProxies => {
outdated_proxies(&state.repo, &state.config)
.with_poll_timer("cleanup-outdated-proxies")
.await?
}
Cleanup::Prune => {
prune(&state.repo, &state.store)
.with_poll_timer("cleanup-prune")
.await? .await?
}
Cleanup::Variant { hash, variant } => {
hash_variant(&state.repo, hash, variant).await?
}
Cleanup::AllVariants => all_variants(&state.repo).await?,
Cleanup::OutdatedVariants => outdated_variants(&state.repo, &state.config).await?,
Cleanup::OutdatedProxies => outdated_proxies(&state.repo, &state.config).await?,
Cleanup::Prune => prune(&state.repo, &state.store).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
} }
} }
@ -57,36 +83,30 @@ where
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn identifier<S>(repo: &ArcRepo, store: &S, identifier: Arc<str>) -> Result<(), Error> async fn identifier<S>(repo: &ArcRepo, store: &S, identifier: Arc<str>) -> JobResult
where where
S: Store, S: Store,
{ {
let mut errors = Vec::new(); match store.remove(&identifier).await {
Ok(_) => {}
if let Err(e) = store.remove(&identifier).await { Err(e) if e.is_not_found() => {}
errors.push(UploadError::from(e)); Err(e) => return Err(e).retry(),
} }
if let Err(e) = repo.cleanup_details(&identifier).await { repo.cleanup_details(&identifier).await.retry()?;
errors.push(UploadError::from(e));
}
for error in errors {
tracing::error!("{}", format!("{error:?}"));
}
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { async fn hash(repo: &ArcRepo, hash: Hash) -> JobResult {
let aliases = repo.aliases_for_hash(hash.clone()).await?; let aliases = repo.aliases_for_hash(hash.clone()).await.retry()?;
if !aliases.is_empty() { if !aliases.is_empty() {
for alias in aliases { for alias in aliases {
// TODO: decide if it is okay to skip aliases without tokens // TODO: decide if it is okay to skip aliases without tokens
if let Some(token) = repo.delete_token(&alias).await? { if let Some(token) = repo.delete_token(&alias).await.retry()? {
super::cleanup_alias(repo, alias, token).await?; super::cleanup_alias(repo, alias, token).await.retry()?;
} else { } else {
tracing::warn!("Not cleaning alias!"); tracing::warn!("Not cleaning alias!");
} }
@ -97,145 +117,152 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let mut idents = repo let mut idents = repo
.variants(hash.clone()) .variants(hash.clone())
.await? .await
.retry()?
.into_iter() .into_iter()
.map(|(_, v)| v) .map(|(_, v)| v)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
idents.extend(repo.identifier(hash.clone()).await?); idents.extend(repo.identifier(hash.clone()).await.retry()?);
idents.extend(repo.motion_identifier(hash.clone()).await?); idents.extend(repo.motion_identifier(hash.clone()).await.retry()?);
for identifier in idents { for identifier in idents {
let _ = super::cleanup_identifier(repo, &identifier).await; super::cleanup_identifier(repo, &identifier).await.retry()?;
} }
repo.cleanup_hash(hash).await?; repo.cleanup_hash(hash).await.retry()?;
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), Error> { pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> JobResult {
let saved_delete_token = repo.delete_token(&alias).await?; let saved_delete_token = repo.delete_token(&alias).await.retry()?;
if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) { if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) {
return Err(UploadError::InvalidToken.into()); return Err(UploadError::InvalidToken).abort();
} }
let hash = repo.hash(&alias).await?; let hash = repo.hash(&alias).await.retry()?;
repo.cleanup_alias(&alias).await?; repo.cleanup_alias(&alias).await.retry()?;
repo.remove_relation(alias.clone()).await?; repo.remove_relation(alias.clone()).await.retry()?;
repo.remove_alias_access(alias.clone()).await?; repo.remove_alias_access(alias.clone()).await.retry()?;
let Some(hash) = hash else { let hash = hash.ok_or(UploadError::MissingAlias).abort()?;
// hash doesn't exist, nothing to do
return Ok(());
};
if repo.aliases_for_hash(hash.clone()).await?.is_empty() { if repo
super::cleanup_hash(repo, hash).await?; .aliases_for_hash(hash.clone())
.await
.retry()?
.is_empty()
{
super::cleanup_hash(repo, hash).await.retry()?;
} }
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { async fn all_variants(repo: &ArcRepo) -> JobResult {
let hash_stream = std::pin::pin!(repo.hashes()); let hash_stream = std::pin::pin!(repo.hashes());
let mut hash_stream = hash_stream.into_streamer(); let mut hash_stream = hash_stream.into_streamer();
while let Some(res) = hash_stream.next().await { while let Some(res) = hash_stream.next().await {
tracing::trace!("all_variants: looping"); tracing::trace!("all_variants: looping");
let hash = res?; let hash = res.retry()?;
super::cleanup_variants(repo, hash, None).await?; super::cleanup_variants(repo, hash, None).await.retry()?;
} }
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> JobResult {
let now = time::OffsetDateTime::now_utc(); let now = time::OffsetDateTime::now_utc();
let since = now.saturating_sub(config.media.retention.variants.to_duration()); let since = now.saturating_sub(config.media.retention.variants.to_duration());
let variant_stream = repo.older_variants(since).await?; let variant_stream = repo.older_variants(since).await.retry()?;
let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048)); let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048));
let mut variant_stream = variant_stream.into_streamer(); let mut variant_stream = variant_stream.into_streamer();
let mut count = 0; let mut count = 0;
while let Some(res) = variant_stream.next().await { while let Some((hash, variant)) = variant_stream.try_next().await.retry()? {
metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_VARIANT).increment(1); metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_VARIANT).increment(1);
tracing::trace!("outdated_variants: looping"); tracing::trace!("outdated_variants: looping");
let (hash, variant) = res?; super::cleanup_variants(repo, hash, Some(variant))
super::cleanup_variants(repo, hash, Some(variant)).await?; .await
.retry()?;
count += 1; count += 1;
} }
tracing::debug!("Queued {count} variant cleanup jobs"); tracing::debug!("Queued {count} variant cleanup jobs");
let queue_length = repo.queue_length().await?; let queue_length = repo.queue_length().await.abort()?;
tracing::debug!("Total queue length: {queue_length}"); tracing::debug!("Total queue length: {queue_length}");
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> JobResult {
let now = time::OffsetDateTime::now_utc(); let now = time::OffsetDateTime::now_utc();
let since = now.saturating_sub(config.media.retention.proxy.to_duration()); let since = now.saturating_sub(config.media.retention.proxy.to_duration());
let alias_stream = repo.older_aliases(since).await?; let alias_stream = repo.older_aliases(since).await.retry()?;
let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048)); let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048));
let mut alias_stream = alias_stream.into_streamer(); let mut alias_stream = alias_stream.into_streamer();
let mut count = 0; let mut count = 0;
while let Some(res) = alias_stream.next().await { while let Some(alias) = alias_stream.try_next().await.retry()? {
metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_PROXY).increment(1); metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_PROXY).increment(1);
tracing::trace!("outdated_proxies: looping"); tracing::trace!("outdated_proxies: looping");
let alias = res?; if let Some(token) = repo.delete_token(&alias).await.retry()? {
if let Some(token) = repo.delete_token(&alias).await? { super::cleanup_alias(repo, alias, token).await.retry()?;
super::cleanup_alias(repo, alias, token).await?;
count += 1; count += 1;
} else { } else {
tracing::warn!("Skipping alias cleanup - no delete token"); tracing::warn!("Skipping alias cleanup - no delete token");
repo.remove_relation(alias.clone()).await?; repo.remove_relation(alias.clone()).await.retry()?;
repo.remove_alias_access(alias).await?; repo.remove_alias_access(alias).await.retry()?;
} }
} }
tracing::debug!("Queued {count} alias cleanup jobs"); tracing::debug!("Queued {count} alias cleanup jobs");
let queue_length = repo.queue_length().await?; let queue_length = repo.queue_length().await.abort()?;
tracing::debug!("Total queue length: {queue_length}"); tracing::debug!("Total queue length: {queue_length}");
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn hash_variant( async fn hash_variant(repo: &ArcRepo, hash: Hash, target_variant: Option<String>) -> JobResult {
repo: &ArcRepo,
hash: Hash,
target_variant: Option<String>,
) -> Result<(), Error> {
if let Some(target_variant) = target_variant { if let Some(target_variant) = target_variant {
if let Some(identifier) = repo if let Some(identifier) = repo
.variant_identifier(hash.clone(), target_variant.clone()) .variant_identifier(hash.clone(), target_variant.clone())
.await? .await
.retry()?
{ {
super::cleanup_identifier(repo, &identifier).await?; super::cleanup_identifier(repo, &identifier).await.retry()?;
} }
repo.remove_variant(hash.clone(), target_variant.clone()) repo.remove_variant(hash.clone(), target_variant.clone())
.await?; .await
repo.remove_variant_access(hash, target_variant).await?; .retry()?;
repo.remove_variant_access(hash, target_variant)
.await
.retry()?;
} else { } else {
for (variant, identifier) in repo.variants(hash.clone()).await? { for (variant, identifier) in repo.variants(hash.clone()).await.retry()? {
repo.remove_variant(hash.clone(), variant.clone()).await?; repo.remove_variant(hash.clone(), variant.clone())
repo.remove_variant_access(hash.clone(), variant).await?; .await
super::cleanup_identifier(repo, &identifier).await?; .retry()?;
repo.remove_variant_access(hash.clone(), variant)
.await
.retry()?;
super::cleanup_identifier(repo, &identifier).await.retry()?;
} }
} }
@ -243,19 +270,20 @@ async fn hash_variant(
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn prune<S>(repo: &ArcRepo, store: &S) -> Result<(), Error> async fn prune<S>(repo: &ArcRepo, store: &S) -> JobResult
where where
S: Store + 'static, S: Store + 'static,
{ {
repo.set("prune-missing-started", b"1".to_vec().into()) repo.set("prune-missing-started", b"1".to_vec().into())
.await?; .await
.retry()?;
let hash_stream = std::pin::pin!(repo.hashes()); let hash_stream = std::pin::pin!(repo.hashes());
let mut hash_stream = hash_stream.into_streamer(); let mut hash_stream = hash_stream.into_streamer();
let mut count: u64 = 0; let mut count: u64 = 0;
while let Some(hash) = hash_stream.try_next().await? { while let Some(hash) = hash_stream.try_next().await.retry()? {
tracing::trace!("prune: looping"); tracing::trace!("prune: looping");
let repo = repo.clone(); let repo = repo.clone();
@ -307,7 +335,8 @@ where
} }
repo.set("prune-missing-complete", b"1".to_vec().into()) repo.set("prune-missing-complete", b"1".to_vec().into())
.await?; .await
.retry()?;
Ok(()) Ok(())
} }

View File

@ -5,7 +5,7 @@ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
error::{Error, UploadError}, error::{Error, UploadError},
formats::InputProcessableFormat, formats::InputProcessableFormat,
future::LocalBoxFuture, future::WithPollTimer,
ingest::Session, ingest::Session,
queue::Process, queue::Process,
repo::{Alias, UploadId, UploadResult}, repo::{Alias, UploadId, UploadResult},
@ -15,49 +15,54 @@ use crate::{
}; };
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use super::{JobContext, JobFuture, JobResult};
pub(super) fn perform<'a, S>( pub(super) fn perform<'a, S>(
state: &'a State<S>, state: &'a State<S>,
process_map: &'a ProcessMap, process_map: &'a ProcessMap,
job: serde_json::Value, job: serde_json::Value,
) -> LocalBoxFuture<'a, Result<(), Error>> ) -> JobFuture<'a>
where where
S: Store + 'static, S: Store + 'static,
{ {
Box::pin(async move { Box::pin(async move {
match serde_json::from_value(job) { let job_text = format!("{job}");
Ok(job) => match job {
Process::Ingest { let job = serde_json::from_value(job)
identifier, .map_err(|e| UploadError::InvalidJob(e, job_text))
upload_id, .abort()?;
declared_alias,
} => { match job {
process_ingest( Process::Ingest {
state, identifier,
Arc::from(identifier), upload_id,
Serde::into_inner(upload_id), declared_alias,
declared_alias.map(Serde::into_inner), } => {
) process_ingest(
.await? state,
} Arc::from(identifier),
Process::Generate { Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
)
.with_poll_timer("process-ingest")
.await?
}
Process::Generate {
target_format,
source,
process_path,
process_args,
} => {
generate(
state,
process_map,
target_format, target_format,
source, Serde::into_inner(source),
process_path, process_path,
process_args, process_args,
} => { )
generate( .with_poll_timer("process-generate")
state, .await?
process_map,
target_format,
Serde::into_inner(source),
process_path,
process_args,
)
.await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
} }
} }
@ -105,13 +110,13 @@ async fn process_ingest<S>(
unprocessed_identifier: Arc<str>, unprocessed_identifier: Arc<str>,
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
) -> Result<(), Error> ) -> JobResult
where where
S: Store + 'static, S: Store + 'static,
{ {
let guard = UploadGuard::guard(upload_id); let guard = UploadGuard::guard(upload_id);
let fut = async { let res = async {
let ident = unprocessed_identifier.clone(); let ident = unprocessed_identifier.clone();
let state2 = state.clone(); let state2 = state.clone();
@ -135,25 +140,33 @@ where
state.store.remove(&unprocessed_identifier).await?; state.store.remove(&unprocessed_identifier).await?;
error_boundary.map_err(|_| UploadError::Canceled)? error_boundary.map_err(|_| UploadError::Canceled)?
}; }
.await;
let result = match fut.await { let (result, err) = match res {
Ok(session) => { Ok(session) => {
let alias = session.alias().take().expect("Alias should exist").clone(); let alias = session.alias().take().expect("Alias should exist").clone();
let token = session.disarm(); let token = session.disarm();
UploadResult::Success { alias, token } (UploadResult::Success { alias, token }, None)
} }
Err(e) => { Err(e) => (
tracing::warn!("Failed to ingest\n{}\n{}", format!("{e}"), format!("{e:?}"));
UploadResult::Failure { UploadResult::Failure {
message: e.root_cause().to_string(), message: e.root_cause().to_string(),
code: e.error_code().into_owned(), code: e.error_code().into_owned(),
} },
} Some(e),
),
}; };
state.repo.complete_upload(upload_id, result).await?; state
.repo
.complete_upload(upload_id, result)
.await
.retry()?;
if let Some(e) = err {
return Err(e).abort();
}
guard.disarm(); guard.disarm();
@ -168,23 +181,28 @@ async fn generate<S: Store + 'static>(
source: Alias, source: Alias,
process_path: PathBuf, process_path: PathBuf,
process_args: Vec<String>, process_args: Vec<String>,
) -> Result<(), Error> { ) -> JobResult {
let Some(hash) = state.repo.hash(&source).await? else { let hash = state
// Nothing to do .repo
return Ok(()); .hash(&source)
}; .await
.retry()?
.ok_or(UploadError::MissingAlias)
.abort()?;
let path_string = process_path.to_string_lossy().to_string(); let path_string = process_path.to_string_lossy().to_string();
let identifier_opt = state let identifier_opt = state
.repo .repo
.variant_identifier(hash.clone(), path_string) .variant_identifier(hash.clone(), path_string)
.await?; .await
.retry()?;
if identifier_opt.is_some() { if identifier_opt.is_some() {
// don't generate already-generated variant
return Ok(()); return Ok(());
} }
let original_details = crate::ensure_details(state, &source).await?; let original_details = crate::ensure_details(state, &source).await.retry()?;
crate::generate::generate( crate::generate::generate(
state, state,
@ -195,7 +213,8 @@ async fn generate<S: Store + 'static>(
&original_details, &original_details,
hash, hash,
) )
.await?; .await
.abort()?;
Ok(()) Ok(())
} }

View File

@ -337,6 +337,13 @@ where
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct JobId(Uuid); pub(crate) struct JobId(Uuid);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) enum JobResult {
Success,
Failure,
Aborted,
}
impl JobId { impl JobId {
pub(crate) fn gen() -> Self { pub(crate) fn gen() -> Self {
Self(Uuid::now_v7()) Self(Uuid::now_v7())
@ -380,6 +387,7 @@ pub(crate) trait QueueRepo: BaseRepo {
queue: &'static str, queue: &'static str,
worker_id: Uuid, worker_id: Uuid,
job_id: JobId, job_id: JobId,
job_status: JobResult,
) -> Result<(), RepoError>; ) -> Result<(), RepoError>;
} }
@ -423,8 +431,9 @@ where
queue: &'static str, queue: &'static str,
worker_id: Uuid, worker_id: Uuid,
job_id: JobId, job_id: JobId,
job_status: JobResult,
) -> Result<(), RepoError> { ) -> Result<(), RepoError> {
T::complete_job(self, queue, worker_id, job_id).await T::complete_job(self, queue, worker_id, job_id, job_status).await
} }
} }

View File

@ -43,9 +43,9 @@ use self::job_status::JobStatus;
use super::{ use super::{
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
VariantAccessRepo, VariantAlreadyExists, UploadResult, VariantAccessRepo, VariantAlreadyExists,
}; };
#[derive(Clone)] #[derive(Clone)]
@ -1358,7 +1358,7 @@ impl QueueRepo for PostgresRepo {
} }
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip_all, fields(job_id))]
async fn pop( async fn pop(
&self, &self,
queue_name: &'static str, queue_name: &'static str,
@ -1420,7 +1420,8 @@ impl QueueRepo for PostgresRepo {
queue_alias queue_alias
.field(status) .field(status)
.eq(JobStatus::New) .eq(JobStatus::New)
.and(queue_alias.field(queue).eq(queue_name)), .and(queue_alias.field(queue).eq(queue_name))
.and(queue_alias.field(retry).ge(1)),
) )
.order(queue_alias.field(queue_time)) .order(queue_alias.field(queue_time))
.for_update() .for_update()
@ -1445,20 +1446,21 @@ impl QueueRepo for PostgresRepo {
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
if let Some((job_id, job_json)) = opt { if let Some((job_id, job_json)) = opt {
tracing::Span::current().record("job_id", &format!("{job_id}"));
guard.disarm(); guard.disarm();
tracing::debug!("{job_json}");
return Ok((JobId(job_id), job_json)); return Ok((JobId(job_id), job_json));
} }
drop(conn); drop(conn);
if notifier match notifier
.notified() .notified()
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.is_ok()
{ {
tracing::debug!("Notified"); Ok(()) => tracing::debug!("Notified"),
} else { Err(_) => tracing::trace!("Timed out"),
tracing::debug!("Timed out");
} }
} }
} }
@ -1499,23 +1501,54 @@ impl QueueRepo for PostgresRepo {
queue_name: &'static str, queue_name: &'static str,
worker_id: Uuid, worker_id: Uuid,
job_id: JobId, job_id: JobId,
job_status: JobResult,
) -> Result<(), RepoError> { ) -> Result<(), RepoError> {
use schema::job_queue::dsl::*; use schema::job_queue::dsl::*;
let mut conn = self.get_connection().await?; let mut conn = self.get_connection().await?;
diesel::delete(job_queue) if matches!(job_status, JobResult::Failure) {
.filter( diesel::update(job_queue)
id.eq(job_id.0) .filter(
.and(queue.eq(queue_name)) id.eq(job_id.0)
.and(worker.eq(worker_id)), .and(queue.eq(queue_name))
) .and(worker.eq(worker_id)),
.execute(&mut conn) )
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_COMPLETE) .set(retry.eq(retry - 1))
.with_timeout(Duration::from_secs(5)) .execute(&mut conn)
.await .with_metrics(crate::init_metrics::POSTGRES_QUEUE_RETRY)
.map_err(|_| PostgresError::DbTimeout)? .with_timeout(Duration::from_secs(5))
.map_err(PostgresError::Diesel)?; .await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
diesel::delete(job_queue)
.filter(
id.eq(job_id.0)
.and(queue.eq(queue_name))
.and(worker.eq(worker_id))
.and(retry.le(0)),
)
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLEANUP)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
} else {
diesel::delete(job_queue)
.filter(
id.eq(job_id.0)
.and(queue.eq(queue_name))
.and(worker.eq(worker_id)),
)
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_COMPLETE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
}
Ok(()) Ok(())
} }

View File

@ -0,0 +1,12 @@
use barrel::backend::Pg;
use barrel::{types, Migration};
pub(crate) fn migration() -> String {
let mut m = Migration::new();
m.change_table("job_queue", |t| {
t.add_column("retry", types::integer().nullable(false).default(5));
});
m.make::<Pg>().to_string()
}

View File

@ -44,6 +44,7 @@ diesel::table! {
queue_time -> Timestamp, queue_time -> Timestamp,
heartbeat -> Nullable<Timestamp>, heartbeat -> Nullable<Timestamp>,
unique_key -> Nullable<Text>, unique_key -> Nullable<Text>,
retry -> Int4,
} }
} }

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
details::HumanDate, details::HumanDate,
error_code::{ErrorCode, OwnedErrorCode}, error_code::{ErrorCode, OwnedErrorCode},
future::WithTimeout,
serde_str::Serde, serde_str::Serde,
stream::{from_iterator, LocalBoxStream}, stream::{from_iterator, LocalBoxStream},
}; };
@ -12,6 +13,7 @@ use std::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, RwLock, Arc, RwLock,
}, },
time::Duration,
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
use url::Url; use url::Url;
@ -21,9 +23,9 @@ use super::{
hash::Hash, hash::Hash,
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
VariantAccessRepo, VariantAlreadyExists, UploadResult, VariantAccessRepo, VariantAlreadyExists,
}; };
macro_rules! b { macro_rules! b {
@ -99,6 +101,7 @@ pub(crate) struct SledRepo {
unique_jobs: Tree, unique_jobs: Tree,
unique_jobs_inverse: Tree, unique_jobs_inverse: Tree,
job_state: Tree, job_state: Tree,
job_retries: Tree,
alias_access: Tree, alias_access: Tree,
inverse_alias_access: Tree, inverse_alias_access: Tree,
variant_access: Tree, variant_access: Tree,
@ -141,6 +144,7 @@ impl SledRepo {
unique_jobs: db.open_tree("pict-rs-unique-jobs-tree")?, unique_jobs: db.open_tree("pict-rs-unique-jobs-tree")?,
unique_jobs_inverse: db.open_tree("pict-rs-unique-jobs-inverse-tree")?, unique_jobs_inverse: db.open_tree("pict-rs-unique-jobs-inverse-tree")?,
job_state: db.open_tree("pict-rs-job-state-tree")?, job_state: db.open_tree("pict-rs-job-state-tree")?,
job_retries: db.open_tree("pict-rs-job-retries-tree")?,
alias_access: db.open_tree("pict-rs-alias-access-tree")?, alias_access: db.open_tree("pict-rs-alias-access-tree")?,
inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?,
variant_access: db.open_tree("pict-rs-variant-access-tree")?, variant_access: db.open_tree("pict-rs-variant-access-tree")?,
@ -653,28 +657,37 @@ impl QueueRepo for SledRepo {
let unique_jobs = self.unique_jobs.clone(); let unique_jobs = self.unique_jobs.clone();
let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let unique_jobs_inverse = self.unique_jobs_inverse.clone();
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let job_retries = self.job_retries.clone();
let res = crate::sync::spawn_blocking("sled-io", move || { let res = crate::sync::spawn_blocking("sled-io", move || {
(&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( (
|(queue, unique_jobs, unique_jobs_inverse, job_state)| { &queue,
let state = JobState::pending(); &unique_jobs,
&unique_jobs_inverse,
queue.insert(&key[..], &job[..])?; &job_state,
if let Some(unique_key) = unique_key { &job_retries,
if unique_jobs
.insert(unique_key.as_bytes(), &key[..])?
.is_some()
{
return sled::transaction::abort(());
}
unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?;
}
job_state.insert(&key[..], state.as_bytes())?;
Ok(())
},
) )
.transaction(
|(queue, unique_jobs, unique_jobs_inverse, job_state, job_retries)| {
let state = JobState::pending();
queue.insert(&key[..], &job[..])?;
if let Some(unique_key) = unique_key {
if unique_jobs
.insert(unique_key.as_bytes(), &key[..])?
.is_some()
{
return sled::transaction::abort(());
}
unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?;
}
job_state.insert(&key[..], state.as_bytes())?;
job_retries.insert(&key[..], &(5_u64.to_be_bytes())[..])?;
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)?; .map_err(|_| RepoError::Canceled)?;
@ -703,7 +716,7 @@ impl QueueRepo for SledRepo {
Ok(Some(id)) Ok(Some(id))
} }
#[tracing::instrument(skip(self, worker_id), fields(job_id))] #[tracing::instrument(skip_all, fields(job_id))]
async fn pop( async fn pop(
&self, &self,
queue_name: &'static str, queue_name: &'static str,
@ -719,7 +732,6 @@ impl QueueRepo for SledRepo {
let queue = self.queue.clone(); let queue = self.queue.clone();
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let span = tracing::Span::current();
let opt = crate::sync::spawn_blocking("sled-io", move || { let opt = crate::sync::spawn_blocking("sled-io", move || {
// Job IDs are generated with Uuid version 7 - defining their first bits as a // Job IDs are generated with Uuid version 7 - defining their first bits as a
// timestamp. Scanning a prefix should give us jobs in the order they were queued. // timestamp. Scanning a prefix should give us jobs in the order they were queued.
@ -760,8 +772,6 @@ impl QueueRepo for SledRepo {
let job_id = JobId::from_bytes(id_bytes); let job_id = JobId::from_bytes(id_bytes);
span.record("job_id", &format!("{job_id:?}"));
let opt = queue let opt = queue
.get(&key)? .get(&key)?
.map(|ivec| serde_json::from_slice(&ivec[..])) .map(|ivec| serde_json::from_slice(&ivec[..]))
@ -777,9 +787,12 @@ impl QueueRepo for SledRepo {
.await .await
.map_err(|_| RepoError::Canceled)??; .map_err(|_| RepoError::Canceled)??;
if let Some(tup) = opt { if let Some((job_id, job_json)) = opt {
tracing::Span::current().record("job_id", &format!("{}", job_id.0));
metrics_guard.disarm(); metrics_guard.disarm();
return Ok(tup); tracing::debug!("{job_json}");
return Ok((job_id, job_json));
} }
let opt = self let opt = self
@ -797,7 +810,14 @@ impl QueueRepo for SledRepo {
Arc::clone(entry) Arc::clone(entry)
}; };
notify.notified().await match notify
.notified()
.with_timeout(Duration::from_secs(30))
.await
{
Ok(()) => tracing::debug!("Notified"),
Err(_) => tracing::trace!("Timed out"),
}
} }
} }
@ -836,25 +856,50 @@ impl QueueRepo for SledRepo {
queue_name: &'static str, queue_name: &'static str,
_worker_id: Uuid, _worker_id: Uuid,
job_id: JobId, job_id: JobId,
job_status: JobResult,
) -> Result<(), RepoError> { ) -> Result<(), RepoError> {
let retry = matches!(job_status, JobResult::Failure);
let key = job_key(queue_name, job_id); let key = job_key(queue_name, job_id);
let queue = self.queue.clone(); let queue = self.queue.clone();
let unique_jobs = self.unique_jobs.clone(); let unique_jobs = self.unique_jobs.clone();
let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let unique_jobs_inverse = self.unique_jobs_inverse.clone();
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let job_retries = self.job_retries.clone();
let res = crate::sync::spawn_blocking("sled-io", move || { let res = crate::sync::spawn_blocking("sled-io", move || {
(&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( (
|(queue, unique_jobs, unique_jobs_inverse, job_state)| { &queue,
queue.remove(&key[..])?; &unique_jobs,
if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? { &unique_jobs_inverse,
unique_jobs.remove(unique_key)?; &job_state,
} &job_retries,
job_state.remove(&key[..])?;
Ok(())
},
) )
.transaction(
|(queue, unique_jobs, unique_jobs_inverse, job_state, job_retries)| {
let retries = job_retries.get(&key[..])?;
let retry_count = retries
.and_then(|ivec| ivec[0..8].try_into().ok())
.map(u64::from_be_bytes)
.unwrap_or(5_u64)
.saturating_sub(1);
if retry_count > 0 && retry {
job_retries.insert(&key[..], &(retry_count.to_be_bytes())[..])?;
} else {
queue.remove(&key[..])?;
if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? {
unique_jobs.remove(unique_key)?;
}
job_state.remove(&key[..])?;
job_retries.remove(&key[..])?;
}
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)?; .map_err(|_| RepoError::Canceled)?;

View File

@ -5,6 +5,8 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
use crate::future::WithPollTimer;
pub(crate) struct DropHandle<T> { pub(crate) struct DropHandle<T> {
handle: JoinHandle<T>, handle: JoinHandle<T>,
} }
@ -75,13 +77,12 @@ pub(crate) fn bare_semaphore(permits: usize) -> Semaphore {
} }
#[track_caller] #[track_caller]
pub(crate) fn spawn<F>(name: &str, future: F) -> tokio::task::JoinHandle<F::Output> pub(crate) fn spawn<F>(name: &'static str, future: F) -> tokio::task::JoinHandle<F::Output>
where where
F: std::future::Future + 'static, F: std::future::Future + 'static,
F::Output: 'static, F::Output: 'static,
{ {
#[cfg(not(tokio_unstable))] let future = future.with_poll_timer(name);
let _ = name;
let span = tracing::trace_span!(parent: None, "spawn task"); let span = tracing::trace_span!(parent: None, "spawn task");
let guard = span.enter(); let guard = span.enter();