diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 765a767..ea72dc5 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -1,7 +1,7 @@ use std::{ future::{ready, Ready}, rc::Rc, - time::Duration, + time::{Duration, Instant}, }; use actix_web::{ @@ -15,6 +15,36 @@ use crate::{future::NowOrNever, stream::LocalBoxStream}; const LIMIT: usize = 256; +struct MetricsGuard { + start: Instant, + armed: bool, +} + +impl MetricsGuard { + fn guard() -> Self { + metrics::counter!("pict-rs.payload.drain.start").increment(1); + + MetricsGuard { + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::counter!("pict-rs.payload.drain.end", "completed" => (!self.armed).to_string()) + .increment(1); + + metrics::histogram!("pict-rs.payload.drain.duration", "completed" => (!self.armed).to_string()) + .record(self.start.elapsed().as_secs_f64()); + } +} + async fn drain(rx: flume::Receiver) { let mut set = JoinSet::new(); @@ -22,11 +52,13 @@ async fn drain(rx: flume::Receiver) { tracing::trace!("drain: looping"); // draining a payload is a best-effort task - if we can't collect in 2 minutes we bail + let guard = MetricsGuard::guard(); set.spawn_local(tokio::time::timeout(Duration::from_secs(120), async move { let mut streamer = payload.into_streamer(); while streamer.next().await.is_some() { tracing::trace!("drain drop bytes: looping"); } + guard.disarm(); })); let mut count = 0; @@ -105,6 +137,7 @@ impl Drop for PayloadStream { if let Some(payload) = self.inner.take() { tracing::warn!("Dropped unclosed payload, draining"); if self.sender.try_send(payload).is_err() { + metrics::counter!("pict-rs.payload.drain.fail-send").increment(1); tracing::error!("Failed to send unclosed payload for draining"); } } diff --git a/src/queue.rs b/src/queue.rs index d976aa3..501d66c 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -358,7 +358,7 @@ async fn process_image_jobs( tracing::warn!("{}", format!("{e:?}")); if e.is_disconnected() { - tokio::time::sleep(Duration::from_secs(3)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } continue; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 07540ed..6678d6a 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -161,6 +161,7 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), let mut variant_stream = repo.older_variants(since).await?.into_streamer(); while let Some(res) = variant_stream.next().await { + metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1); tracing::trace!("outdated_variants: looping"); let (hash, variant) = res?; @@ -178,6 +179,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let mut alias_stream = repo.older_aliases(since).await?.into_streamer(); while let Some(res) = alias_stream.next().await { + metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1); tracing::trace!("outdated_proxies: looping"); let alias = res?;