From ce0df080f448c568cb23e7aa199f5fc11beb2754 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 22 Jul 2023 16:47:59 -0500 Subject: [PATCH] Add prometheus metrics --- Cargo.lock | 111 +++++++++++++++++ Cargo.toml | 2 + src/backgrounded.rs | 4 + src/concurrent_processor.rs | 10 +- src/config/commandline.rs | 25 ++++ src/config/file.rs | 9 ++ src/generate.rs | 35 +++++- src/ingest.rs | 4 + src/lib.rs | 27 +++- src/main.rs | 1 + src/middleware.rs | 4 + src/middleware/metrics.rs | 237 ++++++++++++++++++++++++++++++++++++ src/process.rs | 55 ++++++++- 13 files changed, 518 insertions(+), 6 deletions(-) create mode 100644 src/middleware/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 147821e..754080a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -978,6 +978,15 @@ dependencies = [ "ahash 0.7.6", ] +[[package]] +name = "hashbrown" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash 0.8.3", +] + [[package]] name = "hashbrown" version = "0.14.0" @@ -1285,6 +1294,15 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1324,6 +1342,60 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash 0.8.3", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" +dependencies = [ + "base64 0.21.2", + "hyper", + "indexmap 1.9.3", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", +] + +[[package]] +name = "metrics-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.27", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1661,6 +1733,8 @@ dependencies = [ "futures-util", "hex", "md-5", + "metrics", + "metrics-exporter-prometheus", "mime", "num_cpus", "once_cell", @@ -1728,6 +1802,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc55135a600d700580e406b4de0d59cb9ad25e344a3a091a97ded2622ec4ec6" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1775,6 +1855,22 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.27.1" @@ -1834,6 +1930,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -2245,6 +2350,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" + [[package]] name = "slab" version = "0.4.8" diff --git a/Cargo.toml b/Cargo.toml index d461360..a455d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ flume = "0.10.14" futures-util = "0.3.17" hex = "0.4.3" md-5 = "0.10.5" +metrics = "0.21.1" +metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = ["http-listener"] } mime = "0.3.1" num_cpus = "1.13" once_cell = "1.4.0" diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 86ae39e..69c7972 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -75,6 +75,8 @@ where { fn drop(&mut self) { if self.identifier.is_some() || self.upload_id.is_some() { + metrics::increment_counter!("pict-rs.background.upload.failure"); + let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped backgrounded cleanup"); cleanup_parent_span.follows_from(Span::current()); @@ -108,6 +110,8 @@ where ) }); } + } else { + metrics::increment_counter!("pict-rs.background.upload.success"); } } } diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index ca54d7d..06eeeff 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -56,6 +56,8 @@ impl ProcessMap { completed = &tracing::field::Empty, ); + metrics::increment_counter!("pict-rs.process-map.inserted"); + (CancelState::Sender { sender }, span) } Entry::Occupied(receiver) => { @@ -138,7 +140,9 @@ where CancelState::Sender { sender } => { let res = std::task::ready!(fut.poll(cx)); - process_map.remove(key); + if process_map.remove(key).is_some() { + metrics::increment_counter!("pict-rs.process-map.removed"); + } if let Ok(tup) = &res { let _ = sender.try_send(tup.clone()); @@ -158,6 +162,10 @@ impl Drop for CancelToken { if self.state.is_sender() { let completed = self.process_map.remove(&self.key).is_none(); self.span.record("completed", completed); + + if !completed { + metrics::increment_counter!("pict-rs.process-map.removed"); + } } } } diff --git a/src/config/commandline.rs b/src/config/commandline.rs index d7ce4c1..fc5421c 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -48,6 +48,7 @@ impl Args { worker_id, client_pool_size, client_timeout, + metrics_prometheus_address, media_preprocess_steps, media_max_file_size, media_image_max_width, @@ -104,6 +105,10 @@ impl Args { timeout: client_timeout, }; + let metrics = Metrics { + prometheus_address: metrics_prometheus_address, + }; + let image_quality = ImageQuality { avif: media_image_quality_avif, jpeg: media_image_quality_jpeg, @@ -180,6 +185,7 @@ impl Args { client, old_db, tracing, + metrics, media, store, repo, @@ -197,6 +203,7 @@ impl Args { client, old_db, tracing, + metrics, media, store, repo, @@ -212,6 +219,7 @@ impl Args { client, old_db, tracing, + metrics, media, store: None, repo: None, @@ -229,6 +237,7 @@ impl Args { let server = Server::default(); let client = Client::default(); let media = Media::default(); + let metrics = Metrics::default(); match store { MigrateStoreFrom::Filesystem(MigrateFilesystem { from, to }) => match to { @@ -238,6 +247,7 @@ impl Args { client, old_db, tracing, + metrics, media, store: None, repo, @@ -257,6 +267,7 @@ impl Args { client, old_db, tracing, + metrics, media, store: None, repo, @@ -280,6 +291,7 @@ impl Args { client, old_db, tracing, + metrics, media, store: None, repo, @@ -302,6 +314,7 @@ impl Args { client, old_db, tracing, + metrics, media, store: None, repo, @@ -347,6 +360,7 @@ pub(super) struct ConfigFormat { client: Client, old_db: OldDb, tracing: Tracing, + metrics: Metrics, media: Media, #[serde(skip_serializing_if = "Option::is_none")] repo: Option, @@ -415,6 +429,13 @@ struct OpenTelemetry { targets: Option>, } +#[derive(Debug, Default, serde::Serialize)] +#[serde(rename_all = "snake_case")] +struct Metrics { + #[serde(skip_serializing_if = "Option::is_none")] + prometheus_address: Option, +} + #[derive(Debug, Default, serde::Serialize)] #[serde(rename_all = "snake_case")] struct OldDb { @@ -723,6 +744,10 @@ struct Run { #[arg(long)] client_timeout: Option, + /// Whether to enable the prometheus scrape endpoint + #[arg(long)] + metrics_prometheus_address: Option, + /// How many files are allowed to be uploaded per-request /// /// This number defaults to 1 diff --git a/src/config/file.rs b/src/config/file.rs index e11bb37..7118ce4 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -15,6 +15,8 @@ pub(crate) struct ConfigFile { pub(crate) tracing: Tracing, + pub(crate) metrics: Metrics, + pub(crate) old_db: OldDb, pub(crate) media: Media, @@ -119,6 +121,13 @@ pub(crate) struct Tracing { pub(crate) opentelemetry: OpenTelemetry, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct Metrics { + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) prometheus_address: Option, +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Logging { diff --git a/src/generate.rs b/src/generate.rs index 983fa59..e10b23d 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -8,10 +8,40 @@ use crate::{ store::Store, }; use actix_web::web::Bytes; -use std::path::PathBuf; +use std::{path::PathBuf, time::Instant}; use tokio::io::AsyncReadExt; use tracing::Instrument; +struct MetricsGuard { + start: Instant, + armed: bool, +} + +impl MetricsGuard { + fn guard() -> Self { + metrics::increment_counter!("pict-rs.generate.start"); + Self { + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.generate.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); + if self.armed { + metrics::increment_counter!("pict-rs.generate.failure"); + } else { + metrics::increment_counter!("pict-rs.generate.success"); + } + } +} + #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, hash))] pub(crate) async fn generate( @@ -61,6 +91,7 @@ async fn process( media: &crate::config::Media, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { + let guard = MetricsGuard::guard(); let permit = crate::PROCESS_SEMAPHORE.acquire().await; let identifier = if let Some(identifier) = repo @@ -149,5 +180,7 @@ async fn process( ) .await?; + guard.disarm(); + Ok((details, bytes)) as Result<(Details, Bytes), Error> } diff --git a/src/ingest.rs b/src/ingest.rs index 1c4fd53..037a147 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -225,6 +225,8 @@ where { fn drop(&mut self) { if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { + metrics::increment_counter!("pict-rs.ingest.failure"); + let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); cleanup_parent_span.follows_from(Span::current()); @@ -279,6 +281,8 @@ where ) }); } + } else { + metrics::increment_counter!("pict-rs.ingest.success"); } } } diff --git a/src/lib.rs b/src/lib.rs index 213725a..4cd87b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ use futures_util::{ stream::{empty, once}, Stream, StreamExt, TryStreamExt, }; +use metrics_exporter_prometheus::PrometheusBuilder; +use middleware::Metrics; use once_cell::sync::Lazy; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; @@ -161,6 +163,8 @@ impl FormData for Upload { let store = store.clone(); let config = config.clone(); + metrics::increment_counter!("pict-rs.files", "upload" => "inline"); + let span = tracing::info_span!("file-upload", ?filename); let stream = stream.map_err(Error::from); @@ -218,6 +222,8 @@ impl FormData for Import { let store = store.clone(); let config = config.clone(); + metrics::increment_counter!("pict-rs.files", "import" => "inline"); + let span = tracing::info_span!("file-import", ?filename); let stream = stream.map_err(Error::from); @@ -353,6 +359,8 @@ impl FormData for BackgroundedUpload { let repo = (**repo).clone(); let store = (**store).clone(); + metrics::increment_counter!("pict-rs.files", "upload" => "background"); + let span = tracing::info_span!("file-proxy", ?filename); let stream = stream.map_err(Error::from); @@ -440,6 +448,7 @@ async fn claim_upload( Ok(wait_res) => { let upload_result = wait_res?; repo.claim(upload_id).await?; + metrics::increment_counter!("pict-rs.background.upload.claim"); match upload_result { UploadResult::Success { alias, token } => { @@ -511,6 +520,8 @@ async fn do_download_inline( store: web::Data, config: web::Data, ) -> Result { + metrics::increment_counter!("pict-rs.files", "download" => "inline"); + let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?; let alias = session.alias().expect("alias should exist").to_owned(); @@ -536,6 +547,8 @@ async fn do_download_backgrounded( repo: web::Data, store: web::Data, ) -> Result { + metrics::increment_counter!("pict-rs.files", "download" => "background"); + let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?; let upload_id = backgrounded.upload_id().expect("Upload ID exists"); @@ -1362,6 +1375,7 @@ async fn launch_file_store, T: serde::Serialize> ConfigSource { /// parameters have defaults, it can be useful to dump a valid configuration with default values to /// see what is available for tweaking. /// - /// This function must be called before `run` or `install_tracing` - /// /// When running pict-rs as a library, configuration is limited to environment variables and /// configuration files. Commandline options are not available. /// @@ -1535,6 +1548,16 @@ impl PictRsConfiguration { Ok(self) } + pub fn install_metrics(self) -> color_eyre::Result { + if let Some(addr) = self.config.metrics.prometheus_address { + PrometheusBuilder::new() + .with_http_listener(addr) + .install()?; + } + + Ok(self) + } + /// Run the pict-rs application /// /// This must be called after `init_config`, or else the default configuration builder will run and diff --git a/src/main.rs b/src/main.rs index f4d2984..20aee47 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ async fn main() -> color_eyre::Result<()> { pict_rs::PictRsConfiguration::build_default()? .install_tracing()? + .install_metrics()? .run() .await } diff --git a/src/middleware.rs b/src/middleware.rs index 356861c..bbc2bdf 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,3 +1,5 @@ +mod metrics; + use actix_rt::time::Timeout; use actix_web::{ dev::{Service, ServiceRequest, Transform}, @@ -10,6 +12,8 @@ use std::{ task::{Context, Poll}, }; +pub(crate) use self::metrics::Metrics; + pub(crate) struct Deadline; pub(crate) struct DeadlineMiddleware { inner: S, diff --git a/src/middleware/metrics.rs b/src/middleware/metrics.rs new file mode 100644 index 0000000..e510c28 --- /dev/null +++ b/src/middleware/metrics.rs @@ -0,0 +1,237 @@ +use actix_web::{ + body::MessageBody, + dev::{Service, ServiceRequest, ServiceResponse, Transform}, + http::StatusCode, + HttpResponse, ResponseError, +}; +use std::{ + cell::RefCell, + future::{ready, Future, Ready}, + pin::Pin, + task::{Context, Poll}, + time::Instant, +}; + +struct MetricsGuard { + start: Instant, + matched_path: Option, + armed: bool, +} + +struct MetricsGuardWithStatus { + start: Instant, + matched_path: Option, + status: StatusCode, +} + +impl MetricsGuard { + fn new(matched_path: Option) -> Self { + metrics::increment_counter!("pict-rs.request.start", "path" => format!("{matched_path:?}")); + + Self { + start: Instant::now(), + matched_path, + armed: true, + } + } + + fn with_status(mut self, status: StatusCode) -> MetricsGuardWithStatus { + self.armed = false; + + MetricsGuardWithStatus { + start: self.start, + matched_path: self.matched_path.clone(), + status, + } + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + if self.armed { + metrics::increment_counter!("pict-rs.request.complete", "path" => format!("{:?}", self.matched_path)); + metrics::histogram!("pict-rs.request.timings", self.start.elapsed().as_secs_f64(), "path" => format!("{:?}", self.matched_path)) + } + } +} + +impl Drop for MetricsGuardWithStatus { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.request.complete", "path" => format!("{:?}", self.matched_path), "status" => self.status.to_string()); + metrics::histogram!("pict-rs.request.timings", self.start.elapsed().as_secs_f64(), "path" => format!("{:?}", self.matched_path), "status" => self.status.to_string()); + } +} + +pub(crate) struct Metrics; +pub(crate) struct MetricsMiddleware { + inner: S, +} + +pub(crate) struct MetricsError { + guard: RefCell>, + inner: actix_web::Error, +} + +pin_project_lite::pin_project! { + pub(crate) struct MetricsFuture { + guard: Option, + + #[pin] + inner: F, + } +} + +pin_project_lite::pin_project! { + pub(crate) struct MetricsBody { + guard: Option, + + #[pin] + inner: B, + } +} + +impl Transform for Metrics +where + S: Service>, + S::Future: 'static, + S::Error: Into, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type InitError = (); + type Transform = MetricsMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(MetricsMiddleware { inner: service })) + } +} + +impl Service for MetricsMiddleware +where + S: Service>, + S::Future: 'static, + S::Error: Into, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type Future = MetricsFuture; + + fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll> { + let res = std::task::ready!(self.inner.poll_ready(ctx)); + + Poll::Ready(res.map_err(|e| { + MetricsError { + guard: RefCell::new(None), + inner: e.into(), + } + .into() + })) + } + + fn call(&self, req: ServiceRequest) -> Self::Future { + let matched_path = req.match_pattern(); + + MetricsFuture { + guard: Some(MetricsGuard::new(matched_path)), + inner: self.inner.call(req), + } + } +} + +impl Future for MetricsFuture +where + F: Future, E>>, + E: Into, +{ + type Output = Result>, actix_web::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match std::task::ready!(this.inner.poll(cx)) { + Ok(response) => { + let guard = this.guard.take(); + + Poll::Ready(Ok(response.map_body(|head, inner| MetricsBody { + guard: guard.map(|guard| guard.with_status(head.status)), + inner, + }))) + } + Err(e) => { + let guard = this.guard.take(); + + Poll::Ready(Err(MetricsError { + guard: RefCell::new(guard), + inner: e.into(), + } + .into())) + } + } + } +} + +impl MessageBody for MetricsBody +where + B: MessageBody, +{ + type Error = B::Error; + + fn size(&self) -> actix_web::body::BodySize { + self.inner.size() + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + + let opt = std::task::ready!(this.inner.poll_next(cx)); + + if opt.is_none() { + this.guard.take(); + } + + Poll::Ready(opt) + } +} + +impl std::fmt::Debug for MetricsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MetricsError") + .field("guard", &"Guard") + .field("inner", &self.inner) + .finish() + } +} + +impl std::fmt::Display for MetricsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl std::error::Error for MetricsError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } +} + +impl ResponseError for MetricsError { + fn status_code(&self) -> StatusCode { + self.inner.as_response_error().status_code() + } + + fn error_response(&self) -> HttpResponse { + let guard = self.guard.borrow_mut().take(); + + self.inner.error_response().map_body(|head, inner| { + MetricsBody { + guard: guard.map(|guard| guard.with_status(head.status)), + inner, + } + .boxed() + }) + } +} diff --git a/src/process.rs b/src/process.rs index 333d920..63ca8f4 100644 --- a/src/process.rs +++ b/src/process.rs @@ -5,6 +5,7 @@ use std::{ pin::Pin, process::{ExitStatus, Stdio}, task::{Context, Poll}, + time::Instant, }; use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, @@ -13,12 +14,52 @@ use tokio::{ }; use tracing::{Instrument, Span}; +struct MetricsGuard { + start: Instant, + armed: bool, + command: String, +} + +impl MetricsGuard { + fn guard(command: String) -> Self { + metrics::increment_counter!("pict-rs.process.spawn", "command" => command.clone()); + + Self { + start: Instant::now(), + armed: true, + command, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::histogram!( + "pict-rs.process.duration", + self.start.elapsed().as_secs_f64(), + "command" => self.command.clone(), + "completed" => (!self.armed).to_string(), + ); + + if self.armed { + metrics::increment_counter!("pict-rs.process.failure", "command" => self.command.clone()); + } else { + metrics::increment_counter!("pict-rs.process.success", "command" => self.command.clone()); + } + } +} + #[derive(Debug)] struct StatusError(ExitStatus); pub(crate) struct Process { command: String, child: Child, + guard: MetricsGuard, } impl std::fmt::Debug for Process { @@ -80,6 +121,8 @@ impl Process { fn spawn(command: &str, cmd: &mut Command) -> std::io::Result { tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { + let guard = MetricsGuard::guard(command.into()); + let cmd = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -88,6 +131,7 @@ impl Process { cmd.spawn().map(|child| Process { child, command: String::from(command), + guard, }) }) } @@ -97,7 +141,11 @@ impl Process { let res = self.child.wait().await; match res { - Ok(status) if status.success() => Ok(()), + Ok(status) if status.success() => { + self.guard.disarm(); + + Ok(()) + } Ok(status) => Err(ProcessError::Status(self.command, status)), Err(e) => Err(ProcessError::Other(e)), } @@ -133,6 +181,7 @@ impl Process { let mut child = self.child; let command = self.command; + let guard = self.guard; let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { actix_rt::spawn( async move { @@ -143,7 +192,9 @@ impl Process { match child.wait().await { Ok(status) => { - if !status.success() { + if status.success() { + guard.disarm(); + } else { let _ = tx.send(std::io::Error::new( std::io::ErrorKind::Other, StatusError(status),