diff --git a/src/queue/process.rs b/src/queue/process.rs index b41e877..29fccf7 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,4 +1,5 @@ use reqwest_middleware::ClientWithMiddleware; +use time::Instant; use crate::{ concurrent_processor::ProcessMap, @@ -73,6 +74,40 @@ where }) } +struct UploadGuard { + armed: bool, + start: Instant, + upload_id: UploadId, +} + +impl UploadGuard { + fn guard(upload_id: UploadId) -> Self { + Self { + armed: true, + start: Instant::now(), + upload_id, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for UploadGuard { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.background.upload.ingest", "completed" => (!self.armed).to_string()); + metrics::histogram!("pict-rs.background.upload.ingest.duration", self.start.elapsed().as_seconds_f64(), "completed" => (!self.armed).to_string()); + + if self.armed { + tracing::warn!( + "Upload future for {} dropped before completion! This can cause clients to wait forever", + self.upload_id, + ); + } + } +} + #[tracing::instrument(skip(repo, store, client, media))] async fn process_ingest( repo: &ArcRepo, @@ -86,6 +121,8 @@ async fn process_ingest( where S: Store + 'static, { + let guard = UploadGuard::guard(upload_id); + let fut = async { let ident = unprocessed_identifier.clone(); let store2 = store.clone(); @@ -130,6 +167,8 @@ where repo.complete_upload(upload_id, result).await?; + guard.disarm(); + Ok(()) } diff --git a/src/repo.rs b/src/repo.rs index e75fb5d..fddda9a 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -862,7 +862,7 @@ where } impl Repo { - #[tracing::instrument] + #[tracing::instrument(skip(config))] pub(crate) async fn open(config: config::Repo) -> color_eyre::Result { match config { config::Repo::Sled(config::Sled { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index a5e4679..459d56b 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1169,46 +1169,37 @@ impl QueueRepo for PostgresRepo { tracing::info!("Reset {count} jobs"); } - // TODO: combine into 1 query - let opt = loop { - let id_opt = job_queue - .select(id) - .filter(status.eq(JobStatus::New).and(queue.eq(queue_name))) - .order(queue_time) - .limit(1) - .get_result::(&mut conn) - .with_metrics("pict-rs.postgres.queue.select") - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .optional() - .map_err(PostgresError::Diesel)?; + let queue_alias = diesel::alias!(schema::job_queue as queue_alias); - let Some(id_val) = id_opt else { - break None; - }; + let id_query = queue_alias + .select(queue_alias.field(id)) + .filter( + queue_alias + .field(status) + .eq(JobStatus::New) + .and(queue_alias.field(queue).eq(queue_name)), + ) + .order(queue_alias.field(queue_time)) + .for_update() + .skip_locked() + .single_value(); - let opt = diesel::update(job_queue) - .filter(id.eq(id_val)) - .filter(status.eq(JobStatus::New)) - .set(( - heartbeat.eq(timestamp), - status.eq(JobStatus::Running), - worker.eq(worker_id), - )) - .returning((id, job)) - .get_result(&mut conn) - .with_metrics("pict-rs.postgres.queue.claim") - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .optional() - .map_err(PostgresError::Diesel)?; - - if let Some(tup) = opt { - break Some(tup); - } - }; + let opt = diesel::update(job_queue) + .filter(id.nullable().eq(id_query)) + .filter(status.eq(JobStatus::New)) + .set(( + heartbeat.eq(timestamp), + status.eq(JobStatus::Running), + worker.eq(worker_id), + )) + .returning((id, job)) + .get_result(&mut conn) + .with_metrics("pict-rs.postgres.queue.claim") + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .optional() + .map_err(PostgresError::Diesel)?; if let Some((job_id, job_json)) = opt { guard.disarm();