From 63a550164f47bc7dbe43a9b77b3019e64631075f Mon Sep 17 00:00:00 2001 From: vsilent Date: Mon, 5 Jan 2026 15:36:35 +0200 Subject: [PATCH 1/3] feat: implement comprehensive health check system - Add health check module with component health monitoring - Monitor DB, RabbitMQ, Docker Hub, Redis, Vault connections - Export health metrics for monitoring systems - Add /health_check endpoint with detailed component status - Add /health_check/metrics endpoint for historical statistics - Include response time tracking and degradation detection - Add Casbin rules for metrics endpoint access --- ...20000_casbin_health_metrics_rules.down.sql | 7 + ...3120000_casbin_health_metrics_rules.up.sql | 17 + src/health/checks.rs | 342 ++++++++++++++++++ src/health/metrics.rs | 167 +++++++++ src/health/mod.rs | 7 + src/health/models.rs | 94 +++++ src/lib.rs | 1 + src/routes/health_checks.rs | 24 +- src/routes/mod.rs | 2 +- src/startup.rs | 19 +- 10 files changed, 675 insertions(+), 5 deletions(-) create mode 100644 migrations/20260103120000_casbin_health_metrics_rules.down.sql create mode 100644 migrations/20260103120000_casbin_health_metrics_rules.up.sql create mode 100644 src/health/checks.rs create mode 100644 src/health/metrics.rs create mode 100644 src/health/mod.rs create mode 100644 src/health/models.rs diff --git a/migrations/20260103120000_casbin_health_metrics_rules.down.sql b/migrations/20260103120000_casbin_health_metrics_rules.down.sql new file mode 100644 index 0000000..19ea2ac --- /dev/null +++ b/migrations/20260103120000_casbin_health_metrics_rules.down.sql @@ -0,0 +1,7 @@ +-- Remove Casbin rules for health check metrics endpoint + +DELETE FROM public.casbin_rule +WHERE ptype = 'p' + AND v0 IN ('group_anonymous', 'group_user', 'group_admin') + AND v1 = '/health_check/metrics' + AND v2 = 'GET'; diff --git a/migrations/20260103120000_casbin_health_metrics_rules.up.sql b/migrations/20260103120000_casbin_health_metrics_rules.up.sql new file mode 100644 index 0000000..274f792 --- /dev/null +++ b/migrations/20260103120000_casbin_health_metrics_rules.up.sql @@ -0,0 +1,17 @@ +-- Add Casbin rules for health check metrics endpoint +-- Allow all groups to access health check metrics for monitoring + +-- Anonymous users can check health metrics +INSERT INTO public.casbin_rule (ptype, v0, v1, v2) +VALUES ('p', 'group_anonymous', '/health_check/metrics', 'GET') +ON CONFLICT DO NOTHING; + +-- Regular users can check health metrics +INSERT INTO public.casbin_rule (ptype, v0, v1, v2) +VALUES ('p', 'group_user', '/health_check/metrics', 'GET') +ON CONFLICT DO NOTHING; + +-- Admins can check health metrics +INSERT INTO public.casbin_rule (ptype, v0, v1, v2) +VALUES ('p', 'group_admin', '/health_check/metrics', 'GET') +ON CONFLICT DO NOTHING; diff --git a/src/health/checks.rs b/src/health/checks.rs new file mode 100644 index 0000000..fe4455f --- /dev/null +++ b/src/health/checks.rs @@ -0,0 +1,342 @@ +use super::models::{ComponentHealth, HealthCheckResponse}; +use crate::configuration::Settings; +use sqlx::PgPool; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::time::timeout; + +const CHECK_TIMEOUT: Duration = Duration::from_secs(5); +const SLOW_RESPONSE_THRESHOLD_MS: u64 = 1000; + +pub struct HealthChecker { + pg_pool: Arc, + settings: Arc, + start_time: Instant, +} + +impl HealthChecker { + pub fn new(pg_pool: Arc, settings: Arc) -> Self { + Self { + pg_pool, + settings, + start_time: Instant::now(), + } + } + + pub async fn check_all(&self) -> HealthCheckResponse { + let version = env!("CARGO_PKG_VERSION").to_string(); + let uptime = self.start_time.elapsed().as_secs(); + let mut response = HealthCheckResponse::new(version, uptime); + + let checks = vec![ + ("database", self.check_database()), + ("rabbitmq", self.check_rabbitmq()), + ("dockerhub", self.check_dockerhub()), + ("redis", self.check_redis()), + ("vault", self.check_vault()), + ]; + + let results = futures::future::join_all(checks.into_iter().map(|(name, future)| async move { + let result = timeout(CHECK_TIMEOUT, future).await; + let health = match result { + Ok(health) => health, + Err(_) => ComponentHealth::unhealthy("Health check timeout".to_string()), + }; + (name.to_string(), health) + })) + .await; + + for (name, health) in results { + response.add_component(name, health); + } + + response + } + + #[tracing::instrument(name = "Check database health", skip(self))] + async fn check_database(&self) -> ComponentHealth { + let start = Instant::now(); + + match sqlx::query("SELECT 1 as health_check") + .fetch_one(self.pg_pool.as_ref()) + .await + { + Ok(_) => { + let elapsed = start.elapsed().as_millis() as u64; + let mut health = ComponentHealth::healthy(elapsed); + + if elapsed > SLOW_RESPONSE_THRESHOLD_MS { + health = ComponentHealth::degraded( + "Database responding slowly".to_string(), + Some(elapsed), + ); + } + + let pool_size = self.pg_pool.size(); + let idle_connections = self.pg_pool.num_idle(); + let mut details = HashMap::new(); + details.insert( + "pool_size".to_string(), + serde_json::json!(pool_size), + ); + details.insert( + "idle_connections".to_string(), + serde_json::json!(idle_connections), + ); + details.insert( + "active_connections".to_string(), + serde_json::json!(pool_size as i64 - idle_connections as i64), + ); + + health.with_details(details) + } + Err(e) => { + tracing::error!("Database health check failed: {:?}", e); + ComponentHealth::unhealthy(format!("Database error: {}", e)) + } + } + } + + #[tracing::instrument(name = "Check RabbitMQ health", skip(self))] + async fn check_rabbitmq(&self) -> ComponentHealth { + let start = Instant::now(); + let connection_string = self.settings.amqp.connection_string(); + + match deadpool_lapin::Config { + url: Some(connection_string.clone()), + ..Default::default() + } + .create_pool(Some(deadpool_lapin::Runtime::Tokio1)) + { + Ok(pool) => match pool.get().await { + Ok(conn) => match conn.create_channel().await { + Ok(_channel) => { + let elapsed = start.elapsed().as_millis() as u64; + let mut health = ComponentHealth::healthy(elapsed); + + if elapsed > SLOW_RESPONSE_THRESHOLD_MS { + health = ComponentHealth::degraded( + "RabbitMQ responding slowly".to_string(), + Some(elapsed), + ); + } + + let mut details = HashMap::new(); + details.insert( + "host".to_string(), + serde_json::json!(self.settings.amqp.host), + ); + details.insert( + "port".to_string(), + serde_json::json!(self.settings.amqp.port), + ); + + health.with_details(details) + } + Err(e) => { + tracing::error!("Failed to create RabbitMQ channel: {:?}", e); + ComponentHealth::unhealthy(format!("RabbitMQ channel error: {}", e)) + } + }, + Err(e) => { + tracing::error!("Failed to get RabbitMQ connection: {:?}", e); + ComponentHealth::unhealthy(format!("RabbitMQ connection error: {}", e)) + } + }, + Err(e) => { + tracing::error!("Failed to create RabbitMQ pool: {:?}", e); + ComponentHealth::unhealthy(format!("RabbitMQ config error: {}", e)) + } + } + } + + #[tracing::instrument(name = "Check Docker Hub health", skip(self))] + async fn check_dockerhub(&self) -> ComponentHealth { + let start = Instant::now(); + let url = "https://hub.docker.com/v2/"; + + match reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + { + Ok(client) => match client.get(url).send().await { + Ok(response) => { + let elapsed = start.elapsed().as_millis() as u64; + + if response.status().is_success() { + let mut health = ComponentHealth::healthy(elapsed); + + if elapsed > SLOW_RESPONSE_THRESHOLD_MS { + health = ComponentHealth::degraded( + "Docker Hub responding slowly".to_string(), + Some(elapsed), + ); + } + + let mut details = HashMap::new(); + details.insert("api_version".to_string(), serde_json::json!("v2")); + details.insert( + "status_code".to_string(), + serde_json::json!(response.status().as_u16()), + ); + + health.with_details(details) + } else { + ComponentHealth::unhealthy(format!( + "Docker Hub returned status: {}", + response.status() + )) + } + } + Err(e) => { + tracing::warn!("Docker Hub health check failed: {:?}", e); + ComponentHealth::unhealthy(format!("Docker Hub error: {}", e)) + } + }, + Err(e) => { + tracing::error!("Failed to create HTTP client: {:?}", e); + ComponentHealth::unhealthy(format!("HTTP client error: {}", e)) + } + } + } + + #[tracing::instrument(name = "Check Redis health", skip(self))] + async fn check_redis(&self) -> ComponentHealth { + let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string()); + let start = Instant::now(); + + match redis::Client::open(redis_url.as_str()) { + Ok(client) => { + let conn_result = tokio::task::spawn_blocking(move || client.get_connection()) + .await; + + match conn_result { + Ok(Ok(mut conn)) => { + let ping_result: Result = + tokio::task::spawn_blocking(move || { + redis::cmd("PING").query(&mut conn) + }) + .await + .unwrap_or_else(|_| Err(redis::RedisError::from(( + redis::ErrorKind::IoError, + "Task join error", + )))); + + match ping_result { + Ok(_) => { + let elapsed = start.elapsed().as_millis() as u64; + let mut health = ComponentHealth::healthy(elapsed); + + if elapsed > SLOW_RESPONSE_THRESHOLD_MS { + health = ComponentHealth::degraded( + "Redis responding slowly".to_string(), + Some(elapsed), + ); + } + + let mut details = HashMap::new(); + details.insert("url".to_string(), serde_json::json!(redis_url)); + + health.with_details(details) + } + Err(e) => { + tracing::warn!("Redis PING failed: {:?}", e); + ComponentHealth::degraded( + format!("Redis optional service unavailable: {}", e), + None, + ) + } + } + } + Ok(Err(e)) => { + tracing::warn!("Redis connection failed: {:?}", e); + ComponentHealth::degraded( + format!("Redis optional service unavailable: {}", e), + None, + ) + } + Err(e) => { + tracing::warn!("Redis task failed: {:?}", e); + ComponentHealth::degraded( + format!("Redis optional service unavailable: {}", e), + None, + ) + } + } + } + Err(e) => { + tracing::warn!("Redis client creation failed: {:?}", e); + ComponentHealth::degraded( + format!("Redis optional service unavailable: {}", e), + None, + ) + } + } + } + + #[tracing::instrument(name = "Check Vault health", skip(self))] + async fn check_vault(&self) -> ComponentHealth { + let start = Instant::now(); + let vault_address = &self.settings.vault.address; + let health_url = format!("{}/v1/sys/health", vault_address); + + match reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + { + Ok(client) => match client.get(&health_url).send().await { + Ok(response) => { + let elapsed = start.elapsed().as_millis() as u64; + let status_code = response.status().as_u16(); + + match status_code { + 200 | 429 | 472 | 473 => { + let mut health = ComponentHealth::healthy(elapsed); + + if elapsed > SLOW_RESPONSE_THRESHOLD_MS { + health = ComponentHealth::degraded( + "Vault responding slowly".to_string(), + Some(elapsed), + ); + } + + let mut details = HashMap::new(); + details.insert("address".to_string(), serde_json::json!(vault_address)); + details.insert("status_code".to_string(), serde_json::json!(status_code)); + + if let Ok(body) = response.json::().await { + if let Some(initialized) = body.get("initialized") { + details.insert("initialized".to_string(), initialized.clone()); + } + if let Some(sealed) = body.get("sealed") { + details.insert("sealed".to_string(), sealed.clone()); + } + } + + health.with_details(details) + } + _ => { + tracing::warn!("Vault returned unexpected status: {}", status_code); + ComponentHealth::degraded( + format!("Vault optional service status: {}", status_code), + Some(elapsed), + ) + } + } + } + Err(e) => { + tracing::warn!("Vault health check failed: {:?}", e); + ComponentHealth::degraded( + format!("Vault optional service unavailable: {}", e), + None, + ) + } + }, + Err(e) => { + tracing::error!("Failed to create HTTP client for Vault: {:?}", e); + ComponentHealth::degraded(format!("HTTP client error: {}", e), None) + } + } + } +} diff --git a/src/health/metrics.rs b/src/health/metrics.rs new file mode 100644 index 0000000..a810e36 --- /dev/null +++ b/src/health/metrics.rs @@ -0,0 +1,167 @@ +use super::models::{ComponentHealth, ComponentStatus}; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug, Clone)] +pub struct MetricSnapshot { + pub timestamp: DateTime, + pub component: String, + pub status: ComponentStatus, + pub response_time_ms: Option, +} + +pub struct HealthMetrics { + snapshots: Arc>>, + max_snapshots: usize, +} + +impl HealthMetrics { + pub fn new(max_snapshots: usize) -> Self { + Self { + snapshots: Arc::new(RwLock::new(Vec::new())), + max_snapshots, + } + } + + pub async fn record(&self, component: String, health: &ComponentHealth) { + let snapshot = MetricSnapshot { + timestamp: health.last_checked, + component, + status: health.status.clone(), + response_time_ms: health.response_time_ms, + }; + + let mut snapshots = self.snapshots.write().await; + snapshots.push(snapshot); + + if snapshots.len() > self.max_snapshots { + snapshots.remove(0); + } + } + + pub async fn get_component_stats( + &self, + component: &str, + ) -> Option> { + let snapshots = self.snapshots.read().await; + let component_snapshots: Vec<_> = snapshots + .iter() + .filter(|s| s.component == component) + .collect(); + + if component_snapshots.is_empty() { + return None; + } + + let total = component_snapshots.len(); + let healthy = component_snapshots + .iter() + .filter(|s| s.status == ComponentStatus::Healthy) + .count(); + let degraded = component_snapshots + .iter() + .filter(|s| s.status == ComponentStatus::Degraded) + .count(); + let unhealthy = component_snapshots + .iter() + .filter(|s| s.status == ComponentStatus::Unhealthy) + .count(); + + let response_times: Vec = component_snapshots + .iter() + .filter_map(|s| s.response_time_ms) + .collect(); + + let avg_response_time = if !response_times.is_empty() { + response_times.iter().sum::() / response_times.len() as u64 + } else { + 0 + }; + + let min_response_time = response_times.iter().min().copied(); + let max_response_time = response_times.iter().max().copied(); + + let uptime_percentage = (healthy as f64 / total as f64) * 100.0; + + let mut stats = HashMap::new(); + stats.insert("total_checks".to_string(), serde_json::json!(total)); + stats.insert("healthy_count".to_string(), serde_json::json!(healthy)); + stats.insert("degraded_count".to_string(), serde_json::json!(degraded)); + stats.insert("unhealthy_count".to_string(), serde_json::json!(unhealthy)); + stats.insert( + "uptime_percentage".to_string(), + serde_json::json!(format!("{:.2}", uptime_percentage)), + ); + stats.insert( + "avg_response_time_ms".to_string(), + serde_json::json!(avg_response_time), + ); + + if let Some(min) = min_response_time { + stats.insert("min_response_time_ms".to_string(), serde_json::json!(min)); + } + if let Some(max) = max_response_time { + stats.insert("max_response_time_ms".to_string(), serde_json::json!(max)); + } + + Some(stats) + } + + pub async fn get_all_stats(&self) -> HashMap> { + let snapshots = self.snapshots.read().await; + let mut components: std::collections::HashSet = std::collections::HashSet::new(); + + for snapshot in snapshots.iter() { + components.insert(snapshot.component.clone()); + } + + let mut all_stats = HashMap::new(); + for component in components { + if let Some(stats) = self.get_component_stats(&component).await { + all_stats.insert(component, stats); + } + } + + all_stats + } + + pub async fn clear(&self) { + let mut snapshots = self.snapshots.write().await; + snapshots.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_metrics_recording() { + let metrics = HealthMetrics::new(100); + let health = ComponentHealth::healthy(150); + + metrics.record("database".to_string(), &health).await; + + let stats = metrics.get_component_stats("database").await; + assert!(stats.is_some()); + + let stats = stats.unwrap(); + assert_eq!(stats.get("total_checks").unwrap(), &serde_json::json!(1)); + assert_eq!(stats.get("healthy_count").unwrap(), &serde_json::json!(1)); + } + + #[tokio::test] + async fn test_metrics_limit() { + let metrics = HealthMetrics::new(5); + + for i in 0..10 { + let health = ComponentHealth::healthy(i * 10); + metrics.record("test".to_string(), &health).await; + } + + let snapshots = metrics.snapshots.read().await; + assert_eq!(snapshots.len(), 5); + } +} diff --git a/src/health/mod.rs b/src/health/mod.rs new file mode 100644 index 0000000..fa9726f --- /dev/null +++ b/src/health/mod.rs @@ -0,0 +1,7 @@ +mod checks; +mod metrics; +mod models; + +pub use checks::HealthChecker; +pub use metrics::HealthMetrics; +pub use models::{ComponentHealth, ComponentStatus, HealthCheckResponse}; diff --git a/src/health/models.rs b/src/health/models.rs new file mode 100644 index 0000000..7271c4d --- /dev/null +++ b/src/health/models.rs @@ -0,0 +1,94 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ComponentStatus { + Healthy, + Degraded, + Unhealthy, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ComponentHealth { + pub status: ComponentStatus, + pub message: Option, + pub response_time_ms: Option, + pub last_checked: DateTime, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option>, +} + +impl ComponentHealth { + pub fn healthy(response_time_ms: u64) -> Self { + Self { + status: ComponentStatus::Healthy, + message: None, + response_time_ms: Some(response_time_ms), + last_checked: Utc::now(), + details: None, + } + } + + pub fn unhealthy(error: String) -> Self { + Self { + status: ComponentStatus::Unhealthy, + message: Some(error), + response_time_ms: None, + last_checked: Utc::now(), + details: None, + } + } + + pub fn degraded(message: String, response_time_ms: Option) -> Self { + Self { + status: ComponentStatus::Degraded, + message: Some(message), + response_time_ms, + last_checked: Utc::now(), + details: None, + } + } + + pub fn with_details(mut self, details: HashMap) -> Self { + self.details = Some(details); + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthCheckResponse { + pub status: ComponentStatus, + pub timestamp: DateTime, + pub version: String, + pub uptime_seconds: u64, + pub components: HashMap, +} + +impl HealthCheckResponse { + pub fn new(version: String, uptime_seconds: u64) -> Self { + Self { + status: ComponentStatus::Healthy, + timestamp: Utc::now(), + version, + uptime_seconds, + components: HashMap::new(), + } + } + + pub fn add_component(&mut self, name: String, health: ComponentHealth) { + if health.status == ComponentStatus::Unhealthy { + self.status = ComponentStatus::Unhealthy; + } else if health.status == ComponentStatus::Degraded + && self.status != ComponentStatus::Unhealthy + { + self.status = ComponentStatus::Degraded; + } + self.components.insert(name, health); + } + + pub fn is_healthy(&self) -> bool { + self.status == ComponentStatus::Healthy + } +} diff --git a/src/lib.rs b/src/lib.rs index c5456d8..6117adf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod connectors; pub mod console; pub mod db; pub mod forms; +pub mod health; pub mod helpers; pub mod mcp; mod middleware; diff --git a/src/routes/health_checks.rs b/src/routes/health_checks.rs index 89630f4..dd49d07 100644 --- a/src/routes/health_checks.rs +++ b/src/routes/health_checks.rs @@ -1,6 +1,24 @@ -use actix_web::{get, HttpRequest, HttpResponse}; +use actix_web::{get, web, HttpResponse}; +use crate::health::{HealthChecker, HealthMetrics}; +use std::sync::Arc; #[get("")] -pub async fn health_check(_req: HttpRequest) -> HttpResponse { - HttpResponse::Ok().finish() +pub async fn health_check( + checker: web::Data>, +) -> HttpResponse { + let health_response = checker.check_all().await; + + if health_response.is_healthy() { + HttpResponse::Ok().json(health_response) + } else { + HttpResponse::ServiceUnavailable().json(health_response) + } +} + +#[get("/metrics")] +pub async fn health_metrics( + metrics: web::Data>, +) -> HttpResponse { + let stats = metrics.get_all_stats().await; + HttpResponse::Ok().json(stats) } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 54107f8..62ce6c9 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -5,7 +5,7 @@ pub mod health_checks; pub(crate) mod rating; pub(crate) mod test; -pub use health_checks::*; +pub use health_checks::{health_check, health_metrics}; pub(crate) mod cloud; pub(crate) mod project; pub(crate) mod server; diff --git a/src/startup.rs b/src/startup.rs index 2190978..0feeeb1 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -1,5 +1,6 @@ use crate::configuration::Settings; use crate::connectors; +use crate::health::{HealthChecker, HealthMetrics}; use crate::helpers; use crate::mcp; use crate::middleware; @@ -16,6 +17,9 @@ pub async fn run( pg_pool: Pool, settings: Settings, ) -> Result { + let settings_arc = Arc::new(settings.clone()); + let pg_pool_arc = Arc::new(pg_pool.clone()); + let settings = web::Data::new(settings); let pg_pool = web::Data::new(pg_pool); @@ -29,6 +33,13 @@ pub async fn run( let mcp_registry = Arc::new(mcp::ToolRegistry::new()); let mcp_registry = web::Data::new(mcp_registry); + // Initialize health checker and metrics + let health_checker = Arc::new(HealthChecker::new(pg_pool_arc.clone(), settings_arc.clone())); + let health_checker = web::Data::new(health_checker); + + let health_metrics = Arc::new(HealthMetrics::new(1000)); + let health_metrics = web::Data::new(health_metrics); + // Initialize external service connectors (plugin pattern) // Connector handles category sync on startup let user_service_connector = connectors::init_user_service(&settings.connectors, pg_pool.clone()); @@ -54,7 +65,13 @@ pub async fn run( .wrap(authorization.clone()) .wrap(middleware::authentication::Manager::new()) .wrap(Cors::permissive()) - .service(web::scope("/health_check").service(routes::health_check)) + .app_data(health_checker.clone()) + .app_data(health_metrics.clone()) + .service( + web::scope("/health_check") + .service(routes::health_check) + .service(routes::health_metrics) + ) .service( web::scope("/client") .service(routes::client::add_handler) From dacf17efdb037b34f4eb7e73eca7a250b221c815 Mon Sep 17 00:00:00 2001 From: Vasili Pascal Date: Mon, 5 Jan 2026 15:39:36 +0200 Subject: [PATCH 2/3] Potential fix for code scanning alert no. 6: Workflow does not contain permissions Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- .github/workflows/rust.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e617b62..11da4de 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,4 +1,6 @@ name: Rust +permissions: + contents: read on: push: From 65be44164487f8f423c32011efc2db53695f310e Mon Sep 17 00:00:00 2001 From: vsilent Date: Mon, 5 Jan 2026 15:46:36 +0200 Subject: [PATCH 3/3] fix: resolve compilation errors in health check module - Fix struct literal syntax in RabbitMQ check - Fix async future type mismatches by using tokio::join! - Add Clone derive to Settings struct for Arc sharing --- src/configuration.rs | 2 +- src/health/checks.rs | 50 +++++++++++++++++++++----------------------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/src/configuration.rs b/src/configuration.rs index e6deedc..fd01a96 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,7 +1,7 @@ use serde; use crate::connectors::ConnectorConfig; -#[derive(Debug, serde::Deserialize)] +#[derive(Debug, Clone, serde::Deserialize)] pub struct Settings { pub database: DatabaseSettings, pub app_port: u16, diff --git a/src/health/checks.rs b/src/health/checks.rs index fe4455f..6c67407 100644 --- a/src/health/checks.rs +++ b/src/health/checks.rs @@ -29,27 +29,26 @@ impl HealthChecker { let uptime = self.start_time.elapsed().as_secs(); let mut response = HealthCheckResponse::new(version, uptime); - let checks = vec![ - ("database", self.check_database()), - ("rabbitmq", self.check_rabbitmq()), - ("dockerhub", self.check_dockerhub()), - ("redis", self.check_redis()), - ("vault", self.check_vault()), - ]; - - let results = futures::future::join_all(checks.into_iter().map(|(name, future)| async move { - let result = timeout(CHECK_TIMEOUT, future).await; - let health = match result { - Ok(health) => health, - Err(_) => ComponentHealth::unhealthy("Health check timeout".to_string()), - }; - (name.to_string(), health) - })) - .await; - - for (name, health) in results { - response.add_component(name, health); - } + let db_check = timeout(CHECK_TIMEOUT, self.check_database()); + let mq_check = timeout(CHECK_TIMEOUT, self.check_rabbitmq()); + let hub_check = timeout(CHECK_TIMEOUT, self.check_dockerhub()); + let redis_check = timeout(CHECK_TIMEOUT, self.check_redis()); + let vault_check = timeout(CHECK_TIMEOUT, self.check_vault()); + + let (db_result, mq_result, hub_result, redis_result, vault_result) = + tokio::join!(db_check, mq_check, hub_check, redis_check, vault_check); + + let db_health = db_result.unwrap_or_else(|_| ComponentHealth::unhealthy("Timeout".to_string())); + let mq_health = mq_result.unwrap_or_else(|_| ComponentHealth::unhealthy("Timeout".to_string())); + let hub_health = hub_result.unwrap_or_else(|_| ComponentHealth::unhealthy("Timeout".to_string())); + let redis_health = redis_result.unwrap_or_else(|_| ComponentHealth::unhealthy("Timeout".to_string())); + let vault_health = vault_result.unwrap_or_else(|_| ComponentHealth::unhealthy("Timeout".to_string())); + + response.add_component("database".to_string(), db_health); + response.add_component("rabbitmq".to_string(), mq_health); + response.add_component("dockerhub".to_string(), hub_health); + response.add_component("redis".to_string(), redis_health); + response.add_component("vault".to_string(), vault_health); response } @@ -103,11 +102,10 @@ impl HealthChecker { let start = Instant::now(); let connection_string = self.settings.amqp.connection_string(); - match deadpool_lapin::Config { - url: Some(connection_string.clone()), - ..Default::default() - } - .create_pool(Some(deadpool_lapin::Runtime::Tokio1)) + let mut config = deadpool_lapin::Config::default(); + config.url = Some(connection_string.clone()); + + match config.create_pool(Some(deadpool_lapin::Runtime::Tokio1)) { Ok(pool) => match pool.get().await { Ok(conn) => match conn.create_channel().await {