From 39100b141dcfdc55477daea0223138bf4707db4c Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 5 Jan 2026 11:31:38 -0800 Subject: [PATCH] chore: fmt, fix config properties --- engine/docker/template/grafana-dashboards/gasoline.json | 2 +- engine/packages/config/src/config/runtime.rs | 1 + engine/packages/config/src/config/telemetry.rs | 1 + engine/packages/guard-core/src/proxy_service.rs | 4 +--- engine/packages/pegboard/src/workflows/actor/mod.rs | 9 +++------ 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/engine/docker/template/grafana-dashboards/gasoline.json b/engine/docker/template/grafana-dashboards/gasoline.json index 36dcbfa7e8..20f002b12c 100644 --- a/engine/docker/template/grafana-dashboards/gasoline.json +++ b/engine/docker/template/grafana-dashboards/gasoline.json @@ -527,7 +527,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "count by (rivet_datacenter) ((time() - timestamp(rivet_gasoline_worker_last_ping{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"}) < 30)) ", + "expr": "count by (rivet_datacenter) ((time() - rivet_gasoline_worker_last_ping{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"} / 1000) < 30)", "instant": false, "legendFormat": "__auto", "range": true, diff --git a/engine/packages/config/src/config/runtime.rs b/engine/packages/config/src/config/runtime.rs index cde4350bc2..083fbcc64a 100644 --- a/engine/packages/config/src/config/runtime.rs +++ b/engine/packages/config/src/config/runtime.rs @@ -4,6 +4,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct Runtime { /// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When /// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu. diff --git a/engine/packages/config/src/config/telemetry.rs b/engine/packages/config/src/config/telemetry.rs index 9641232ee8..286405ffd8 100644 --- a/engine/packages/config/src/config/telemetry.rs +++ b/engine/packages/config/src/config/telemetry.rs @@ -2,6 +2,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct Telemetry { pub enabled: bool, } diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 80abf6cb93..b6eb2213a5 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -2288,9 +2288,7 @@ impl ProxyService { // Keep TCP connection open briefly to allow client to process close tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; } - .instrument( - tracing::info_span!("ws_error_proxy_task", ?request_ids.ray_id), - ), + .instrument(tracing::info_span!("ws_error_proxy_task")), ); // Return the response that will upgrade the client connection diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 65f5ef5c6e..2d6d3f79e4 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -276,7 +276,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> async move { let signals = if let Some(gc_timeout_ts) = state.gc_timeout_ts { // Listen for signals with gc timeout. if a timeout happens, it means this actor is lost - let signals = ctx.listen_n_until::
(gc_timeout_ts, 1024).await?; + let signals = ctx.listen_n_until::
(gc_timeout_ts, 256).await?; if signals.is_empty() { tracing::warn!(actor_id=?input.actor_id, "actor lost"); @@ -292,7 +292,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } else if let Some(alarm_ts) = state.alarm_ts { // Listen for signals with timeout. if a timeout happens, it means this actor should // wake up - let signals = ctx.listen_n_until::
(alarm_ts, 1024).await?; + let signals = ctx.listen_n_until::
(alarm_ts, 256).await?; if signals.is_empty() { tracing::debug!(actor_id=?input.actor_id, "actor wake"); @@ -307,7 +307,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } else { // Listen for signals normally - ctx.listen_n::
(1024).await? + ctx.listen_n::
(256).await? }; for sig in signals { @@ -824,9 +824,6 @@ async fn handle_stopped( }) .await?; - // NOTE: The reason we allocate other actors from this actor workflow is because if we instead sent a - // signal to the runner wf here it would incur a heavy throughput hit and we need the runner wf to be as - // lightweight as possible; processing as few signals that aren't events/commands. // Allocate other pending actors from queue since a slot has now cleared let allocate_pending_res = ctx .activity(AllocatePendingActorsInput {