From f0f40db8c31ece496d91bb6e74c8996151f52162 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 29 Jan 2023 11:36:09 -0600 Subject: [PATCH] Add healthcheck for db --- src/lib.rs | 6 ++++++ src/repo.rs | 9 ++++++++- src/repo/sled.rs | 22 ++++++++++++++++++++-- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bd1cdc6..348e1cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -985,6 +985,11 @@ async fn identifier( }))) } +async fn healthz(repo: web::Data) -> Result { + repo.health_check().await?; + Ok(HttpResponse::Ok().finish()) +} + fn transform_error(error: actix_form_data::Error) -> actix_web::Error { let error: Error = error.into(); let error: actix_web::Error = error.into(); @@ -1039,6 +1044,7 @@ async fn launch( .app_data(web::Data::new(repo)) .app_data(web::Data::new(store)) .app_data(web::Data::new(build_client())) + .route("/healthz", web::get().to(healthz::)) .service( web::scope("/image") .service( diff --git a/src/repo.rs b/src/repo.rs index 1634328..cd3bf34 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -60,6 +60,8 @@ pub(crate) trait FullRepo: + Clone + Debug { + async fn health_check(&self) -> Result<(), Error>; + #[tracing::instrument(skip(self))] async fn identifier_from_alias( &self, @@ -103,7 +105,12 @@ pub(crate) trait FullRepo: } } -impl FullRepo for actix_web::web::Data where T: FullRepo {} +#[async_trait::async_trait(?Send)] +impl FullRepo for actix_web::web::Data where T: FullRepo { + async fn health_check(&self) -> Result<(), Error> { + T::health_check(self).await + } +} pub(crate) trait BaseRepo { type Bytes: AsRef<[u8]> + From> + Clone; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 7820f97..e5dc422 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -13,7 +13,10 @@ use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ collections::HashMap, pin::Pin, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, }; use tokio::sync::Notify; @@ -52,6 +55,8 @@ pub(crate) enum SledError { #[derive(Clone)] pub(crate) struct SledRepo { + healthz_count: Arc, + healthz: Tree, settings: Tree, identifier_details: Tree, hashes: Tree, @@ -74,6 +79,8 @@ pub(crate) struct SledRepo { impl SledRepo { pub(crate) fn new(db: Db) -> Result { Ok(SledRepo { + healthz_count: Arc::new(AtomicU64::new(0)), + healthz: db.open_tree("pict-rs-healthz-tree")?, settings: db.open_tree("pict-rs-settings-tree")?, identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, hashes: db.open_tree("pict-rs-hashes-tree")?, @@ -99,7 +106,18 @@ impl BaseRepo for SledRepo { type Bytes = IVec; } -impl FullRepo for SledRepo {} +#[async_trait::async_trait(?Send)] +impl FullRepo for SledRepo { + async fn health_check(&self) -> Result<(), Error> { + let next = self.healthz_count.fetch_add(1, Ordering::Relaxed); + b!(self.healthz, { + healthz.insert("healthz", &next.to_be_bytes()[..]) + }); + self.healthz.flush_async().await?; + b!(self.healthz, healthz.get("healthz")); + Ok(()) + } +} #[derive(serde::Deserialize, serde::Serialize)] enum InnerUploadResult {