2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2025-01-23 09:55:50 +00:00

Add metrics & tracing to dropped uploads, turn pop into a single query

This commit is contained in:
asonix 2023-09-07 19:20:41 -05:00
parent d3c663ccd0
commit 858899b943
3 changed files with 69 additions and 39 deletions

View file

@ -1,4 +1,5 @@
use reqwest_middleware::ClientWithMiddleware;
use time::Instant;
use crate::{
concurrent_processor::ProcessMap,
@ -73,6 +74,40 @@ where
})
}
struct UploadGuard {
armed: bool,
start: Instant,
upload_id: UploadId,
}
impl UploadGuard {
fn guard(upload_id: UploadId) -> Self {
Self {
armed: true,
start: Instant::now(),
upload_id,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for UploadGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.background.upload.ingest", "completed" => (!self.armed).to_string());
metrics::histogram!("pict-rs.background.upload.ingest.duration", self.start.elapsed().as_seconds_f64(), "completed" => (!self.armed).to_string());
if self.armed {
tracing::warn!(
"Upload future for {} dropped before completion! This can cause clients to wait forever",
self.upload_id,
);
}
}
}
#[tracing::instrument(skip(repo, store, client, media))]
async fn process_ingest<S>(
repo: &ArcRepo,
@ -86,6 +121,8 @@ async fn process_ingest<S>(
where
S: Store + 'static,
{
let guard = UploadGuard::guard(upload_id);
let fut = async {
let ident = unprocessed_identifier.clone();
let store2 = store.clone();
@ -130,6 +167,8 @@ where
repo.complete_upload(upload_id, result).await?;
guard.disarm();
Ok(())
}

View file

@ -862,7 +862,7 @@ where
}
impl Repo {
#[tracing::instrument]
#[tracing::instrument(skip(config))]
pub(crate) async fn open(config: config::Repo) -> color_eyre::Result<Self> {
match config {
config::Repo::Sled(config::Sled {

View file

@ -1169,46 +1169,37 @@ impl QueueRepo for PostgresRepo {
tracing::info!("Reset {count} jobs");
}
// TODO: combine into 1 query
let opt = loop {
let id_opt = job_queue
.select(id)
.filter(status.eq(JobStatus::New).and(queue.eq(queue_name)))
.order(queue_time)
.limit(1)
.get_result::<Uuid>(&mut conn)
.with_metrics("pict-rs.postgres.queue.select")
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?;
let queue_alias = diesel::alias!(schema::job_queue as queue_alias);
let Some(id_val) = id_opt else {
break None;
};
let id_query = queue_alias
.select(queue_alias.field(id))
.filter(
queue_alias
.field(status)
.eq(JobStatus::New)
.and(queue_alias.field(queue).eq(queue_name)),
)
.order(queue_alias.field(queue_time))
.for_update()
.skip_locked()
.single_value();
let opt = diesel::update(job_queue)
.filter(id.eq(id_val))
.filter(status.eq(JobStatus::New))
.set((
heartbeat.eq(timestamp),
status.eq(JobStatus::Running),
worker.eq(worker_id),
))
.returning((id, job))
.get_result(&mut conn)
.with_metrics("pict-rs.postgres.queue.claim")
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?;
if let Some(tup) = opt {
break Some(tup);
}
};
let opt = diesel::update(job_queue)
.filter(id.nullable().eq(id_query))
.filter(status.eq(JobStatus::New))
.set((
heartbeat.eq(timestamp),
status.eq(JobStatus::Running),
worker.eq(worker_id),
))
.returning((id, job))
.get_result(&mut conn)
.with_metrics("pict-rs.postgres.queue.claim")
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?;
if let Some((job_id, job_json)) = opt {
guard.disarm();