diff --git a/src/error.rs b/src/error.rs index f2bb7b3..2350074 100644 --- a/src/error.rs +++ b/src/error.rs @@ -108,6 +108,9 @@ pub(crate) enum UploadError { #[error("Error in request response")] Request(#[from] reqwest::Error), + #[error("Invalid job popped from job queue: {1}")] + InvalidJob(#[source] serde_json::Error, String), + #[error("pict-rs is in read-only mode")] ReadOnly, @@ -201,6 +204,7 @@ impl UploadError { Self::Timeout(_) | Self::AggregateTimeout => ErrorCode::STREAM_TOO_SLOW, Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT, Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, + Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB, } } diff --git a/src/error_code.rs b/src/error_code.rs index 62059e2..462a284 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -144,4 +144,7 @@ impl ErrorCode { pub(crate) const FAILED_EXTERNAL_VALIDATION: ErrorCode = ErrorCode { code: "failed-external-validation", }; + pub(crate) const INVALID_JOB: ErrorCode = ErrorCode { + code: "invalid-job", + }; } diff --git a/src/future.rs b/src/future.rs index e3323c5..79f0a97 100644 --- a/src/future.rs +++ b/src/future.rs @@ -59,9 +59,19 @@ pub(crate) trait WithMetrics: Future { } } +pub(crate) trait WithPollTimer: Future { + fn with_poll_timer(self, name: &'static str) -> PollTimer + where + Self: Sized, + { + PollTimer { name, inner: self } + } +} + impl NowOrNever for F where F: Future {} impl WithMetrics for F where F: Future {} impl WithTimeout for F where F: Future {} +impl WithPollTimer for F where F: Future {} pin_project_lite::pin_project! { pub(crate) struct MetricsFuture { @@ -104,3 +114,59 @@ impl Drop for Metrics { .record(self.start.elapsed().as_secs_f64()); } } + +pin_project_lite::pin_project! { + pub(crate) struct PollTimer { + name: &'static str, + + #[pin] + inner: F, + } +} + +impl Future for PollTimer +where + F: Future, +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let start = Instant::now(); + + let this = self.project(); + + let out = this.inner.poll(cx); + + let elapsed = start.elapsed(); + if elapsed > Duration::from_micros(10) { + metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string()); + } + + if elapsed > Duration::from_secs(1) { + tracing::warn!( + "Future {} polled for {} seconds", + this.name, + elapsed.as_secs() + ); + } else if elapsed > Duration::from_millis(1) { + tracing::warn!("Future {} polled for {} ms", this.name, elapsed.as_millis()); + } else if elapsed > Duration::from_micros(200) { + tracing::debug!( + "Future {} polled for {} microseconds", + this.name, + elapsed.as_micros(), + ); + } else if elapsed > Duration::from_micros(1) { + tracing::trace!( + "Future {} polled for {} microseconds", + this.name, + elapsed.as_micros() + ); + } + + out + } +} diff --git a/src/generate.rs b/src/generate.rs index e43cec3..950aa7f 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -6,7 +6,7 @@ use crate::{ details::Details, error::{Error, UploadError}, formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, - future::{WithMetrics, WithTimeout}, + future::{WithMetrics, WithPollTimer, WithTimeout}, repo::{Hash, VariantAlreadyExists}, state::State, store::Store, @@ -48,7 +48,7 @@ impl Drop for MetricsGuard { } } -#[tracing::instrument(skip(state, process_map, hash))] +#[tracing::instrument(skip(state, process_map, original_details, hash))] pub(crate) async fn generate( state: &State, process_map: &ProcessMap, @@ -78,6 +78,7 @@ pub(crate) async fn generate( let (details, identifier) = process_map .process(hash, thumbnail_path, process_fut) + .with_poll_timer("process-future") .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics(crate::init_metrics::GENERATE_PROCESS) .await diff --git a/src/ingest.rs b/src/ingest.rs index 8c548e8..a074f84 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -5,7 +5,7 @@ use crate::{ details::Details, error::{Error, UploadError}, formats::InternalFormat, - future::WithMetrics, + future::{WithMetrics, WithPollTimer}, repo::{Alias, ArcRepo, DeleteToken, Hash}, state::State, store::Store, @@ -159,7 +159,9 @@ where let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode { dummy_ingest(state, stream).await? } else { - process_ingest(state, stream).await? + process_ingest(state, stream) + .with_poll_timer("ingest-future") + .await? }; let mut session = Session { diff --git a/src/init_metrics.rs b/src/init_metrics.rs index a699fa7..96d42ca 100644 --- a/src/init_metrics.rs +++ b/src/init_metrics.rs @@ -1,5 +1,6 @@ pub(super) fn init_metrics() { describe_toplevel(); + describe_future(); describe_queue_cleanup(); describe_payload(); describe_job(); @@ -26,6 +27,15 @@ fn describe_toplevel() { pub(crate) const FILES: &str = "pict-rs.files"; pub(crate) const BACKGROUND_UPLOAD_CLAIM: &str = "pict-rs.background.upload.claim"; +fn describe_future() { + metrics::describe_counter!( + FUTURE_POLL_TIMER_EXCEEDED, + "How many times a given poll operation has lasted longer than 10 microseconds" + ); +} + +pub(crate) const FUTURE_POLL_TIMER_EXCEEDED: &str = "pict-rs.future.poll-timer.exceeded"; + fn describe_queue_cleanup() { metrics::describe_counter!( CLEANUP_OUTDATED_PROXY, @@ -344,6 +354,14 @@ fn describe_postgres() { POSTGRES_QUEUE_HEARTBEAT, "Timings for updating the provided job's keepalive heartbeat" ); + metrics::describe_histogram!( + POSTGRES_QUEUE_RETRY, + "Timings for updating retry count for a job" + ); + metrics::describe_histogram!( + POSTGRES_QUEUE_CLEANUP, + "Timings for removing jobs with no more retries" + ); metrics::describe_histogram!( POSTGRES_QUEUE_COMPLETE, "Timings for removing a completed job from the queue" @@ -471,6 +489,8 @@ pub(crate) const POSTGRES_QUEUE_LISTEN: &str = "pict-rs.postgres.queue.listen"; pub(crate) const POSTGRES_QUEUE_REQUEUE: &str = "pict-rs.postgres.queue.requeue"; pub(crate) const POSTGRES_QUEUE_CLAIM: &str = "pict-rs.postgres.queue.claim"; pub(crate) const POSTGRES_QUEUE_HEARTBEAT: &str = "pict-rs.postgres.queue.heartbeat"; +pub(crate) const POSTGRES_QUEUE_RETRY: &str = "pict-rs.postgres.queue.retry"; +pub(crate) const POSTGRES_QUEUE_CLEANUP: &str = "pict-rs.postgres.queue.cleanup"; pub(crate) const POSTGRES_QUEUE_COMPLETE: &str = "pict-rs.postgres.queue.complete"; pub(crate) const POSTGRES_STORE_MIGRATION_COUNT: &str = "pict-rs.postgres.store-migration.count"; pub(crate) const POSTGRES_STORE_MIGRATION_MARK_MIGRATED: &str = diff --git a/src/lib.rs b/src/lib.rs index d6c765b..7a83903 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ use actix_web::{ web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; use details::{ApiDetails, HumanDate}; -use future::WithTimeout; +use future::{WithPollTimer, WithTimeout}; use futures_core::Stream; use magick::ArcPolicyDir; use metrics_exporter_prometheus::PrometheusBuilder; @@ -186,6 +186,7 @@ impl FormData for Upload { ingest::ingest(&state, stream, None).await } + .with_poll_timer("file-upload") .instrument(span), ) })), @@ -237,6 +238,7 @@ impl FormData for Import { ingest::ingest(&state, stream, Some(Alias::from_existing(&filename))) .await } + .with_poll_timer("file-import") .instrument(span), ) })), @@ -351,6 +353,7 @@ impl FormData for BackgroundedUpload { Backgrounded::proxy(&state, stream).await } + .with_poll_timer("file-proxy") .instrument(span), ) })), diff --git a/src/queue.rs b/src/queue.rs index e2cbd12..0c78463 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -10,6 +10,7 @@ use crate::{ }; use std::{ + ops::Deref, path::PathBuf, sync::Arc, time::{Duration, Instant}, @@ -225,10 +226,86 @@ impl Drop for MetricsGuard { } } +pub(super) enum JobError { + Abort(Error), + Retry(Error), +} + +impl AsRef for JobError { + fn as_ref(&self) -> &Error { + match self { + Self::Abort(e) | Self::Retry(e) => e, + } + } +} + +impl Deref for JobError { + type Target = Error; + + fn deref(&self) -> &Self::Target { + match self { + Self::Abort(e) | Self::Retry(e) => e, + } + } +} + +impl From for Error { + fn from(value: JobError) -> Self { + match value { + JobError::Abort(e) | JobError::Retry(e) => e, + } + } +} + +type JobResult = Result; + +type JobFuture<'a> = LocalBoxFuture<'a, JobResult>; + +trait JobContext { + type Item; + + fn abort(self) -> JobResult + where + Self: Sized; + + fn retry(self) -> JobResult + where + Self: Sized; +} + +impl JobContext for Result +where + E: Into, +{ + type Item = T; + + fn abort(self) -> JobResult + where + Self: Sized, + { + self.map_err(Into::into).map_err(JobError::Abort) + } + + fn retry(self) -> JobResult + where + Self: Sized, + { + self.map_err(Into::into).map_err(JobError::Retry) + } +} + +fn job_result(result: &JobResult) -> crate::repo::JobResult { + match result { + Ok(()) => crate::repo::JobResult::Success, + Err(JobError::Retry(_)) => crate::repo::JobResult::Failure, + Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted, + } +} + async fn process_jobs(state: State, queue: &'static str, callback: F) where S: Store, - for<'a> F: Fn(&'a State, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy, { let worker_id = uuid::Uuid::new_v4(); @@ -262,7 +339,7 @@ async fn job_loop( ) -> Result<(), Error> where S: Store, - for<'a> F: Fn(&'a State, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, + for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy, { loop { tracing::trace!("job_loop: looping"); @@ -283,7 +360,10 @@ where ) .await; - state.repo.complete_job(queue, worker_id, job_id).await?; + state + .repo + .complete_job(queue, worker_id, job_id, job_result(&res)) + .await?; res?; @@ -303,8 +383,7 @@ async fn process_image_jobs( callback: F, ) where S: Store, - for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> - + Copy, + for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy, { let worker_id = uuid::Uuid::new_v4(); @@ -339,8 +418,7 @@ async fn image_job_loop( ) -> Result<(), Error> where S: Store, - for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> - + Copy, + for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy, { loop { tracing::trace!("image_job_loop: looping"); @@ -361,7 +439,10 @@ where ) .await; - state.repo.complete_job(queue, worker_id, job_id).await?; + state + .repo + .complete_job(queue, worker_id, job_id, job_result(&res)) + .await?; res?; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index aa904d8..aa94291 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -6,7 +6,7 @@ use tracing::{Instrument, Span}; use crate::{ config::Configuration, error::{Error, UploadError}, - future::LocalBoxFuture, + future::WithPollTimer, queue::Cleanup, repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, @@ -14,41 +14,67 @@ use crate::{ store::Store, }; -pub(super) fn perform( - state: &State, - job: serde_json::Value, -) -> LocalBoxFuture<'_, Result<(), Error>> +use super::{JobContext, JobFuture, JobResult}; + +pub(super) fn perform(state: &State, job: serde_json::Value) -> JobFuture<'_> where S: Store + 'static, { Box::pin(async move { - match serde_json::from_value(job) { - Ok(job) => match job { - Cleanup::Hash { hash: in_hash } => hash(&state.repo, in_hash).await?, - Cleanup::Identifier { - identifier: in_identifier, - } => identifier(&state.repo, &state.store, Arc::from(in_identifier)).await?, - Cleanup::Alias { - alias: stored_alias, - token, - } => { - alias( - &state.repo, - Serde::into_inner(stored_alias), - Serde::into_inner(token), - ) + let job_text = format!("{job}"); + + let job = serde_json::from_value(job) + .map_err(|e| UploadError::InvalidJob(e, job_text)) + .abort()?; + + match job { + Cleanup::Hash { hash: in_hash } => { + hash(&state.repo, in_hash) + .with_poll_timer("cleanup-hash") + .await? + } + Cleanup::Identifier { + identifier: in_identifier, + } => { + identifier(&state.repo, &state.store, Arc::from(in_identifier)) + .with_poll_timer("cleanup-identifier") + .await? + } + Cleanup::Alias { + alias: stored_alias, + token, + } => { + alias( + &state.repo, + Serde::into_inner(stored_alias), + Serde::into_inner(token), + ) + .await? + } + Cleanup::Variant { hash, variant } => { + hash_variant(&state.repo, hash, variant) + .with_poll_timer("cleanup-hash-variant") + .await? + } + Cleanup::AllVariants => { + all_variants(&state.repo) + .with_poll_timer("cleanup-all-variants") + .await? + } + Cleanup::OutdatedVariants => { + outdated_variants(&state.repo, &state.config) + .with_poll_timer("cleanup-outdated-variants") + .await? + } + Cleanup::OutdatedProxies => { + outdated_proxies(&state.repo, &state.config) + .with_poll_timer("cleanup-outdated-proxies") + .await? + } + Cleanup::Prune => { + prune(&state.repo, &state.store) + .with_poll_timer("cleanup-prune") .await? - } - Cleanup::Variant { hash, variant } => { - hash_variant(&state.repo, hash, variant).await? - } - Cleanup::AllVariants => all_variants(&state.repo).await?, - Cleanup::OutdatedVariants => outdated_variants(&state.repo, &state.config).await?, - Cleanup::OutdatedProxies => outdated_proxies(&state.repo, &state.config).await?, - Cleanup::Prune => prune(&state.repo, &state.store).await?, - }, - Err(e) => { - tracing::warn!("Invalid job: {}", format!("{e}")); } } @@ -57,36 +83,30 @@ where } #[tracing::instrument(skip_all)] -async fn identifier(repo: &ArcRepo, store: &S, identifier: Arc) -> Result<(), Error> +async fn identifier(repo: &ArcRepo, store: &S, identifier: Arc) -> JobResult where S: Store, { - let mut errors = Vec::new(); - - if let Err(e) = store.remove(&identifier).await { - errors.push(UploadError::from(e)); + match store.remove(&identifier).await { + Ok(_) => {} + Err(e) if e.is_not_found() => {} + Err(e) => return Err(e).retry(), } - if let Err(e) = repo.cleanup_details(&identifier).await { - errors.push(UploadError::from(e)); - } - - for error in errors { - tracing::error!("{}", format!("{error:?}")); - } + repo.cleanup_details(&identifier).await.retry()?; Ok(()) } #[tracing::instrument(skip_all)] -async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { - let aliases = repo.aliases_for_hash(hash.clone()).await?; +async fn hash(repo: &ArcRepo, hash: Hash) -> JobResult { + let aliases = repo.aliases_for_hash(hash.clone()).await.retry()?; if !aliases.is_empty() { for alias in aliases { // TODO: decide if it is okay to skip aliases without tokens - if let Some(token) = repo.delete_token(&alias).await? { - super::cleanup_alias(repo, alias, token).await?; + if let Some(token) = repo.delete_token(&alias).await.retry()? { + super::cleanup_alias(repo, alias, token).await.retry()?; } else { tracing::warn!("Not cleaning alias!"); } @@ -97,145 +117,152 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { let mut idents = repo .variants(hash.clone()) - .await? + .await + .retry()? .into_iter() .map(|(_, v)| v) .collect::>(); - idents.extend(repo.identifier(hash.clone()).await?); - idents.extend(repo.motion_identifier(hash.clone()).await?); + idents.extend(repo.identifier(hash.clone()).await.retry()?); + idents.extend(repo.motion_identifier(hash.clone()).await.retry()?); for identifier in idents { - let _ = super::cleanup_identifier(repo, &identifier).await; + super::cleanup_identifier(repo, &identifier).await.retry()?; } - repo.cleanup_hash(hash).await?; + repo.cleanup_hash(hash).await.retry()?; Ok(()) } #[tracing::instrument(skip_all)] -pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), Error> { - let saved_delete_token = repo.delete_token(&alias).await?; +pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> JobResult { + let saved_delete_token = repo.delete_token(&alias).await.retry()?; if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) { - return Err(UploadError::InvalidToken.into()); + return Err(UploadError::InvalidToken).abort(); } - let hash = repo.hash(&alias).await?; + let hash = repo.hash(&alias).await.retry()?; - repo.cleanup_alias(&alias).await?; - repo.remove_relation(alias.clone()).await?; - repo.remove_alias_access(alias.clone()).await?; + repo.cleanup_alias(&alias).await.retry()?; + repo.remove_relation(alias.clone()).await.retry()?; + repo.remove_alias_access(alias.clone()).await.retry()?; - let Some(hash) = hash else { - // hash doesn't exist, nothing to do - return Ok(()); - }; + let hash = hash.ok_or(UploadError::MissingAlias).abort()?; - if repo.aliases_for_hash(hash.clone()).await?.is_empty() { - super::cleanup_hash(repo, hash).await?; + if repo + .aliases_for_hash(hash.clone()) + .await + .retry()? + .is_empty() + { + super::cleanup_hash(repo, hash).await.retry()?; } Ok(()) } #[tracing::instrument(skip_all)] -async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { +async fn all_variants(repo: &ArcRepo) -> JobResult { let hash_stream = std::pin::pin!(repo.hashes()); let mut hash_stream = hash_stream.into_streamer(); while let Some(res) = hash_stream.next().await { tracing::trace!("all_variants: looping"); - let hash = res?; - super::cleanup_variants(repo, hash, None).await?; + let hash = res.retry()?; + super::cleanup_variants(repo, hash, None).await.retry()?; } Ok(()) } #[tracing::instrument(skip_all)] -async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { +async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> JobResult { let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.variants.to_duration()); - let variant_stream = repo.older_variants(since).await?; + let variant_stream = repo.older_variants(since).await.retry()?; let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048)); let mut variant_stream = variant_stream.into_streamer(); let mut count = 0; - while let Some(res) = variant_stream.next().await { + while let Some((hash, variant)) = variant_stream.try_next().await.retry()? { metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_VARIANT).increment(1); tracing::trace!("outdated_variants: looping"); - let (hash, variant) = res?; - super::cleanup_variants(repo, hash, Some(variant)).await?; + super::cleanup_variants(repo, hash, Some(variant)) + .await + .retry()?; count += 1; } tracing::debug!("Queued {count} variant cleanup jobs"); - let queue_length = repo.queue_length().await?; + let queue_length = repo.queue_length().await.abort()?; tracing::debug!("Total queue length: {queue_length}"); Ok(()) } #[tracing::instrument(skip_all)] -async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { +async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> JobResult { let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.proxy.to_duration()); - let alias_stream = repo.older_aliases(since).await?; + let alias_stream = repo.older_aliases(since).await.retry()?; let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048)); let mut alias_stream = alias_stream.into_streamer(); let mut count = 0; - while let Some(res) = alias_stream.next().await { + while let Some(alias) = alias_stream.try_next().await.retry()? { metrics::counter!(crate::init_metrics::CLEANUP_OUTDATED_PROXY).increment(1); tracing::trace!("outdated_proxies: looping"); - let alias = res?; - if let Some(token) = repo.delete_token(&alias).await? { - super::cleanup_alias(repo, alias, token).await?; + if let Some(token) = repo.delete_token(&alias).await.retry()? { + super::cleanup_alias(repo, alias, token).await.retry()?; count += 1; } else { tracing::warn!("Skipping alias cleanup - no delete token"); - repo.remove_relation(alias.clone()).await?; - repo.remove_alias_access(alias).await?; + repo.remove_relation(alias.clone()).await.retry()?; + repo.remove_alias_access(alias).await.retry()?; } } tracing::debug!("Queued {count} alias cleanup jobs"); - let queue_length = repo.queue_length().await?; + let queue_length = repo.queue_length().await.abort()?; tracing::debug!("Total queue length: {queue_length}"); Ok(()) } #[tracing::instrument(skip_all)] -async fn hash_variant( - repo: &ArcRepo, - hash: Hash, - target_variant: Option, -) -> Result<(), Error> { +async fn hash_variant(repo: &ArcRepo, hash: Hash, target_variant: Option) -> JobResult { if let Some(target_variant) = target_variant { if let Some(identifier) = repo .variant_identifier(hash.clone(), target_variant.clone()) - .await? + .await + .retry()? { - super::cleanup_identifier(repo, &identifier).await?; + super::cleanup_identifier(repo, &identifier).await.retry()?; } repo.remove_variant(hash.clone(), target_variant.clone()) - .await?; - repo.remove_variant_access(hash, target_variant).await?; + .await + .retry()?; + repo.remove_variant_access(hash, target_variant) + .await + .retry()?; } else { - for (variant, identifier) in repo.variants(hash.clone()).await? { - repo.remove_variant(hash.clone(), variant.clone()).await?; - repo.remove_variant_access(hash.clone(), variant).await?; - super::cleanup_identifier(repo, &identifier).await?; + for (variant, identifier) in repo.variants(hash.clone()).await.retry()? { + repo.remove_variant(hash.clone(), variant.clone()) + .await + .retry()?; + repo.remove_variant_access(hash.clone(), variant) + .await + .retry()?; + super::cleanup_identifier(repo, &identifier).await.retry()?; } } @@ -243,19 +270,20 @@ async fn hash_variant( } #[tracing::instrument(skip_all)] -async fn prune(repo: &ArcRepo, store: &S) -> Result<(), Error> +async fn prune(repo: &ArcRepo, store: &S) -> JobResult where S: Store + 'static, { repo.set("prune-missing-started", b"1".to_vec().into()) - .await?; + .await + .retry()?; let hash_stream = std::pin::pin!(repo.hashes()); let mut hash_stream = hash_stream.into_streamer(); let mut count: u64 = 0; - while let Some(hash) = hash_stream.try_next().await? { + while let Some(hash) = hash_stream.try_next().await.retry()? { tracing::trace!("prune: looping"); let repo = repo.clone(); @@ -307,7 +335,8 @@ where } repo.set("prune-missing-complete", b"1".to_vec().into()) - .await?; + .await + .retry()?; Ok(()) } diff --git a/src/queue/process.rs b/src/queue/process.rs index 5fff30f..653ca26 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -5,7 +5,7 @@ use crate::{ concurrent_processor::ProcessMap, error::{Error, UploadError}, formats::InputProcessableFormat, - future::LocalBoxFuture, + future::WithPollTimer, ingest::Session, queue::Process, repo::{Alias, UploadId, UploadResult}, @@ -15,49 +15,54 @@ use crate::{ }; use std::{path::PathBuf, sync::Arc}; +use super::{JobContext, JobFuture, JobResult}; + pub(super) fn perform<'a, S>( state: &'a State, process_map: &'a ProcessMap, job: serde_json::Value, -) -> LocalBoxFuture<'a, Result<(), Error>> +) -> JobFuture<'a> where S: Store + 'static, { Box::pin(async move { - match serde_json::from_value(job) { - Ok(job) => match job { - Process::Ingest { - identifier, - upload_id, - declared_alias, - } => { - process_ingest( - state, - Arc::from(identifier), - Serde::into_inner(upload_id), - declared_alias.map(Serde::into_inner), - ) - .await? - } - Process::Generate { + let job_text = format!("{job}"); + + let job = serde_json::from_value(job) + .map_err(|e| UploadError::InvalidJob(e, job_text)) + .abort()?; + + match job { + Process::Ingest { + identifier, + upload_id, + declared_alias, + } => { + process_ingest( + state, + Arc::from(identifier), + Serde::into_inner(upload_id), + declared_alias.map(Serde::into_inner), + ) + .with_poll_timer("process-ingest") + .await? + } + Process::Generate { + target_format, + source, + process_path, + process_args, + } => { + generate( + state, + process_map, target_format, - source, + Serde::into_inner(source), process_path, process_args, - } => { - generate( - state, - process_map, - target_format, - Serde::into_inner(source), - process_path, - process_args, - ) - .await? - } - }, - Err(e) => { - tracing::warn!("Invalid job: {}", format!("{e}")); + ) + .with_poll_timer("process-generate") + .await? } } @@ -105,13 +110,13 @@ async fn process_ingest( unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, -) -> Result<(), Error> +) -> JobResult where S: Store + 'static, { let guard = UploadGuard::guard(upload_id); - let fut = async { + let res = async { let ident = unprocessed_identifier.clone(); let state2 = state.clone(); @@ -135,25 +140,33 @@ where state.store.remove(&unprocessed_identifier).await?; error_boundary.map_err(|_| UploadError::Canceled)? - }; + } + .await; - let result = match fut.await { + let (result, err) = match res { Ok(session) => { let alias = session.alias().take().expect("Alias should exist").clone(); let token = session.disarm(); - UploadResult::Success { alias, token } + (UploadResult::Success { alias, token }, None) } - Err(e) => { - tracing::warn!("Failed to ingest\n{}\n{}", format!("{e}"), format!("{e:?}")); - + Err(e) => ( UploadResult::Failure { message: e.root_cause().to_string(), code: e.error_code().into_owned(), - } - } + }, + Some(e), + ), }; - state.repo.complete_upload(upload_id, result).await?; + state + .repo + .complete_upload(upload_id, result) + .await + .retry()?; + + if let Some(e) = err { + return Err(e).abort(); + } guard.disarm(); @@ -168,23 +181,28 @@ async fn generate( source: Alias, process_path: PathBuf, process_args: Vec, -) -> Result<(), Error> { - let Some(hash) = state.repo.hash(&source).await? else { - // Nothing to do - return Ok(()); - }; +) -> JobResult { + let hash = state + .repo + .hash(&source) + .await + .retry()? + .ok_or(UploadError::MissingAlias) + .abort()?; let path_string = process_path.to_string_lossy().to_string(); let identifier_opt = state .repo .variant_identifier(hash.clone(), path_string) - .await?; + .await + .retry()?; if identifier_opt.is_some() { + // don't generate already-generated variant return Ok(()); } - let original_details = crate::ensure_details(state, &source).await?; + let original_details = crate::ensure_details(state, &source).await.retry()?; crate::generate::generate( state, @@ -195,7 +213,8 @@ async fn generate( &original_details, hash, ) - .await?; + .await + .abort()?; Ok(()) } diff --git a/src/repo.rs b/src/repo.rs index 48c4ebc..ed87b7a 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -337,6 +337,13 @@ where #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct JobId(Uuid); +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) enum JobResult { + Success, + Failure, + Aborted, +} + impl JobId { pub(crate) fn gen() -> Self { Self(Uuid::now_v7()) @@ -380,6 +387,7 @@ pub(crate) trait QueueRepo: BaseRepo { queue: &'static str, worker_id: Uuid, job_id: JobId, + job_status: JobResult, ) -> Result<(), RepoError>; } @@ -423,8 +431,9 @@ where queue: &'static str, worker_id: Uuid, job_id: JobId, + job_status: JobResult, ) -> Result<(), RepoError> { - T::complete_job(self, queue, worker_id, job_id).await + T::complete_job(self, queue, worker_id, job_id, job_status).await } } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 02ee3fd..a7dce4a 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -43,9 +43,9 @@ use self::job_status::JobStatus; use super::{ metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, - FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, - QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, - VariantAccessRepo, VariantAlreadyExists, + FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, + ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, + UploadResult, VariantAccessRepo, VariantAlreadyExists, }; #[derive(Clone)] @@ -1358,7 +1358,7 @@ impl QueueRepo for PostgresRepo { } } - #[tracing::instrument(level = "debug", skip(self))] + #[tracing::instrument(level = "debug", skip_all, fields(job_id))] async fn pop( &self, queue_name: &'static str, @@ -1420,7 +1420,8 @@ impl QueueRepo for PostgresRepo { queue_alias .field(status) .eq(JobStatus::New) - .and(queue_alias.field(queue).eq(queue_name)), + .and(queue_alias.field(queue).eq(queue_name)) + .and(queue_alias.field(retry).ge(1)), ) .order(queue_alias.field(queue_time)) .for_update() @@ -1445,20 +1446,21 @@ impl QueueRepo for PostgresRepo { .map_err(PostgresError::Diesel)?; if let Some((job_id, job_json)) = opt { + tracing::Span::current().record("job_id", &format!("{job_id}")); + guard.disarm(); + tracing::debug!("{job_json}"); return Ok((JobId(job_id), job_json)); } drop(conn); - if notifier + match notifier .notified() .with_timeout(Duration::from_secs(5)) .await - .is_ok() { - tracing::debug!("Notified"); - } else { - tracing::debug!("Timed out"); + Ok(()) => tracing::debug!("Notified"), + Err(_) => tracing::trace!("Timed out"), } } } @@ -1499,23 +1501,54 @@ impl QueueRepo for PostgresRepo { queue_name: &'static str, worker_id: Uuid, job_id: JobId, + job_status: JobResult, ) -> Result<(), RepoError> { use schema::job_queue::dsl::*; let mut conn = self.get_connection().await?; - diesel::delete(job_queue) - .filter( - id.eq(job_id.0) - .and(queue.eq(queue_name)) - .and(worker.eq(worker_id)), - ) - .execute(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_QUEUE_COMPLETE) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + if matches!(job_status, JobResult::Failure) { + diesel::update(job_queue) + .filter( + id.eq(job_id.0) + .and(queue.eq(queue_name)) + .and(worker.eq(worker_id)), + ) + .set(retry.eq(retry - 1)) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_QUEUE_RETRY) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + + diesel::delete(job_queue) + .filter( + id.eq(job_id.0) + .and(queue.eq(queue_name)) + .and(worker.eq(worker_id)) + .and(retry.le(0)), + ) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLEANUP) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + } else { + diesel::delete(job_queue) + .filter( + id.eq(job_id.0) + .and(queue.eq(queue_name)) + .and(worker.eq(worker_id)), + ) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_QUEUE_COMPLETE) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + } Ok(()) } diff --git a/src/repo/postgres/migrations/V0013__add_retry_count_to_jobs.rs b/src/repo/postgres/migrations/V0013__add_retry_count_to_jobs.rs new file mode 100644 index 0000000..10f40ae --- /dev/null +++ b/src/repo/postgres/migrations/V0013__add_retry_count_to_jobs.rs @@ -0,0 +1,12 @@ +use barrel::backend::Pg; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.change_table("job_queue", |t| { + t.add_column("retry", types::integer().nullable(false).default(5)); + }); + + m.make::().to_string() +} diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index ec1d7c3..fa772b7 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -44,6 +44,7 @@ diesel::table! { queue_time -> Timestamp, heartbeat -> Nullable, unique_key -> Nullable, + retry -> Int4, } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index e99ded0..d3da1a2 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,6 +1,7 @@ use crate::{ details::HumanDate, error_code::{ErrorCode, OwnedErrorCode}, + future::WithTimeout, serde_str::Serde, stream::{from_iterator, LocalBoxStream}, }; @@ -12,6 +13,7 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, + time::Duration, }; use tokio::sync::Notify; use url::Url; @@ -21,9 +23,9 @@ use super::{ hash::Hash, metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, - DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, - QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, - VariantAccessRepo, VariantAlreadyExists, + DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, + ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, + UploadResult, VariantAccessRepo, VariantAlreadyExists, }; macro_rules! b { @@ -99,6 +101,7 @@ pub(crate) struct SledRepo { unique_jobs: Tree, unique_jobs_inverse: Tree, job_state: Tree, + job_retries: Tree, alias_access: Tree, inverse_alias_access: Tree, variant_access: Tree, @@ -141,6 +144,7 @@ impl SledRepo { unique_jobs: db.open_tree("pict-rs-unique-jobs-tree")?, unique_jobs_inverse: db.open_tree("pict-rs-unique-jobs-inverse-tree")?, job_state: db.open_tree("pict-rs-job-state-tree")?, + job_retries: db.open_tree("pict-rs-job-retries-tree")?, alias_access: db.open_tree("pict-rs-alias-access-tree")?, inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, variant_access: db.open_tree("pict-rs-variant-access-tree")?, @@ -653,28 +657,37 @@ impl QueueRepo for SledRepo { let unique_jobs = self.unique_jobs.clone(); let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let job_state = self.job_state.clone(); + let job_retries = self.job_retries.clone(); let res = crate::sync::spawn_blocking("sled-io", move || { - (&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( - |(queue, unique_jobs, unique_jobs_inverse, job_state)| { - let state = JobState::pending(); - - queue.insert(&key[..], &job[..])?; - if let Some(unique_key) = unique_key { - if unique_jobs - .insert(unique_key.as_bytes(), &key[..])? - .is_some() - { - return sled::transaction::abort(()); - } - - unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?; - } - job_state.insert(&key[..], state.as_bytes())?; - - Ok(()) - }, + ( + &queue, + &unique_jobs, + &unique_jobs_inverse, + &job_state, + &job_retries, ) + .transaction( + |(queue, unique_jobs, unique_jobs_inverse, job_state, job_retries)| { + let state = JobState::pending(); + + queue.insert(&key[..], &job[..])?; + if let Some(unique_key) = unique_key { + if unique_jobs + .insert(unique_key.as_bytes(), &key[..])? + .is_some() + { + return sled::transaction::abort(()); + } + + unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?; + } + job_state.insert(&key[..], state.as_bytes())?; + job_retries.insert(&key[..], &(5_u64.to_be_bytes())[..])?; + + Ok(()) + }, + ) }) .await .map_err(|_| RepoError::Canceled)?; @@ -703,7 +716,7 @@ impl QueueRepo for SledRepo { Ok(Some(id)) } - #[tracing::instrument(skip(self, worker_id), fields(job_id))] + #[tracing::instrument(skip_all, fields(job_id))] async fn pop( &self, queue_name: &'static str, @@ -719,7 +732,6 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); - let span = tracing::Span::current(); let opt = crate::sync::spawn_blocking("sled-io", move || { // Job IDs are generated with Uuid version 7 - defining their first bits as a // timestamp. Scanning a prefix should give us jobs in the order they were queued. @@ -760,8 +772,6 @@ impl QueueRepo for SledRepo { let job_id = JobId::from_bytes(id_bytes); - span.record("job_id", &format!("{job_id:?}")); - let opt = queue .get(&key)? .map(|ivec| serde_json::from_slice(&ivec[..])) @@ -777,9 +787,12 @@ impl QueueRepo for SledRepo { .await .map_err(|_| RepoError::Canceled)??; - if let Some(tup) = opt { + if let Some((job_id, job_json)) = opt { + tracing::Span::current().record("job_id", &format!("{}", job_id.0)); + metrics_guard.disarm(); - return Ok(tup); + tracing::debug!("{job_json}"); + return Ok((job_id, job_json)); } let opt = self @@ -797,7 +810,14 @@ impl QueueRepo for SledRepo { Arc::clone(entry) }; - notify.notified().await + match notify + .notified() + .with_timeout(Duration::from_secs(30)) + .await + { + Ok(()) => tracing::debug!("Notified"), + Err(_) => tracing::trace!("Timed out"), + } } } @@ -836,25 +856,50 @@ impl QueueRepo for SledRepo { queue_name: &'static str, _worker_id: Uuid, job_id: JobId, + job_status: JobResult, ) -> Result<(), RepoError> { + let retry = matches!(job_status, JobResult::Failure); + let key = job_key(queue_name, job_id); let queue = self.queue.clone(); let unique_jobs = self.unique_jobs.clone(); let unique_jobs_inverse = self.unique_jobs_inverse.clone(); let job_state = self.job_state.clone(); + let job_retries = self.job_retries.clone(); let res = crate::sync::spawn_blocking("sled-io", move || { - (&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction( - |(queue, unique_jobs, unique_jobs_inverse, job_state)| { - queue.remove(&key[..])?; - if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? { - unique_jobs.remove(unique_key)?; - } - job_state.remove(&key[..])?; - Ok(()) - }, + ( + &queue, + &unique_jobs, + &unique_jobs_inverse, + &job_state, + &job_retries, ) + .transaction( + |(queue, unique_jobs, unique_jobs_inverse, job_state, job_retries)| { + let retries = job_retries.get(&key[..])?; + + let retry_count = retries + .and_then(|ivec| ivec[0..8].try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or(5_u64) + .saturating_sub(1); + + if retry_count > 0 && retry { + job_retries.insert(&key[..], &(retry_count.to_be_bytes())[..])?; + } else { + queue.remove(&key[..])?; + if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? { + unique_jobs.remove(unique_key)?; + } + job_state.remove(&key[..])?; + job_retries.remove(&key[..])?; + } + + Ok(()) + }, + ) }) .await .map_err(|_| RepoError::Canceled)?; diff --git a/src/sync.rs b/src/sync.rs index f40b741..20a5468 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -5,6 +5,8 @@ use tokio::{ task::JoinHandle, }; +use crate::future::WithPollTimer; + pub(crate) struct DropHandle { handle: JoinHandle, } @@ -75,13 +77,12 @@ pub(crate) fn bare_semaphore(permits: usize) -> Semaphore { } #[track_caller] -pub(crate) fn spawn(name: &str, future: F) -> tokio::task::JoinHandle +pub(crate) fn spawn(name: &'static str, future: F) -> tokio::task::JoinHandle where F: std::future::Future + 'static, F::Output: 'static, { - #[cfg(not(tokio_unstable))] - let _ = name; + let future = future.with_poll_timer(name); let span = tracing::trace_span!(parent: None, "spawn task"); let guard = span.enter();