Skip to content

Commit a4c59b6

Browse files
committed
fix: add wf and guard metrics
1 parent 16a7519 commit a4c59b6

File tree

5 files changed

+65
-115
lines changed

5 files changed

+65
-115
lines changed

engine/packages/api-builder/src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ lazy_static::lazy_static! {
2424
*REGISTRY
2525
).unwrap();
2626
pub static ref API_REQUEST_ERRORS: IntCounterVec = register_int_counter_vec_with_registry!(
27-
"api_request_errors",
27+
"api_request_errors_total",
2828
"All errors made to this request.",
2929
&["method", "path", "status", "error_code"],
3030
*REGISTRY,

engine/packages/gasoline/src/builder/workflow/lupe.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Instant;
1+
use std::time::{Duration, Instant};
22

33
use anyhow::Result;
44
use serde::{Serialize, de::DeserializeOwned};
@@ -117,12 +117,11 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
117117

118118
// Used to defer loop upsertion for parallelization
119119
let mut loop_event_upsert_fut = None;
120+
let mut iteration_dt = Duration::ZERO;
120121

121122
loop {
122123
ctx.check_stop()?;
123124

124-
let start_instant = Instant::now();
125-
126125
// Create a new branch for each iteration of the loop at location {...loop location, iteration idx}
127126
let mut iteration_branch = loop_branch.branch_inner(
128127
ctx.input().clone(),
@@ -140,8 +139,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
140139
let i = iteration;
141140

142141
// Async block for instrumentation purposes
143-
let (dt2, res) = async {
144-
let start_instant2 = Instant::now();
142+
let res = async {
145143
let db2 = ctx.db().clone();
146144

147145
// NOTE: Great care has been taken to optimize this function. This join allows multiple
@@ -151,9 +149,14 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
151149
// commit the loop event. This only happens on the first iteration of the loop
152150
// 2. Second, we commit the branch event for the current iteration
153151
// 3. Third, we run the user's loop code
154-
// 4. Last, if we have to upsert the loop event, we save the future and process it in the
152+
// 4. Last, if we have to upsert the loop event, we save the future and poll it in the
155153
// next iteration of the loop as part of this join
156-
let (loop_event_commit_res, loop_event_upsert_res, branch_commit_res, loop_res) = tokio::join!(
154+
let (
155+
loop_event_commit_res,
156+
loop_event_upsert_res,
157+
branch_commit_res,
158+
(loop_res, cb_dt),
159+
) = tokio::join!(
157160
async {
158161
if let Some(loop_event_init_fut) = loop_event_init_fut.take() {
159162
loop_event_init_fut.await
@@ -163,10 +166,14 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
163166
},
164167
async {
165168
if let Some(loop_event_upsert_fut) = loop_event_upsert_fut.take() {
166-
loop_event_upsert_fut.await
167-
} else {
168-
Ok(())
169+
let start_instant = Instant::now();
170+
loop_event_upsert_fut.await?;
171+
metrics::LOOP_COMMIT_DURATION
172+
.with_label_values(&[&ctx.name().to_string()])
173+
.observe(start_instant.elapsed().as_secs_f64());
169174
}
175+
176+
anyhow::Ok(())
170177
},
171178
async {
172179
// Insert event if iteration is not a replay
@@ -177,22 +184,35 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
177184
ctx.version(),
178185
Some(&loop_location),
179186
)
180-
.await
181-
} else {
182-
Ok(())
187+
.await?;
188+
189+
// Only record iteration duration if its not a replay
190+
metrics::LOOP_ITERATION_DURATION
191+
.with_label_values(&[&ctx.name().to_string()])
192+
.observe(iteration_dt.as_secs_f64());
183193
}
194+
195+
anyhow::Ok(())
184196
},
185-
cb(&mut iteration_branch, &mut state),
197+
async {
198+
let iteration_start_instant = Instant::now();
199+
200+
(
201+
cb(&mut iteration_branch, &mut state).await,
202+
iteration_start_instant.elapsed(),
203+
)
204+
}
186205
);
187206

188207
loop_event_commit_res?;
189208
loop_event_upsert_res?;
190209
branch_commit_res?;
191210

211+
iteration_dt = cb_dt;
212+
192213
// Run loop
193214
match loop_res? {
194215
Loop::Continue => {
195-
let dt2 = start_instant2.elapsed().as_secs_f64();
196216
iteration += 1;
197217

198218
// Commit workflow state to db
@@ -226,10 +246,9 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
226246
});
227247
}
228248

229-
anyhow::Ok((dt2, None))
249+
anyhow::Ok(None)
230250
}
231251
Loop::Break(res) => {
232-
let dt2 = start_instant2.elapsed().as_secs_f64();
233252
iteration += 1;
234253

235254
let state_val = serde_json::value::to_raw_value(&state)
@@ -252,7 +271,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
252271
)
253272
.await?;
254273

255-
Ok((dt2, Some(res)))
274+
Ok(Some(res))
256275
}
257276
}
258277
}
@@ -262,11 +281,6 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
262281
// Validate no leftover events
263282
iteration_branch.cursor().check_clear()?;
264283

265-
let dt = start_instant.elapsed().as_secs_f64();
266-
metrics::LOOP_ITERATION_DURATION
267-
.with_label_values(&[&ctx.name().to_string()])
268-
.observe(dt - dt2);
269-
270284
if let Some(res) = res {
271285
break res;
272286
}

engine/packages/gasoline/src/metrics.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,17 @@ lazy_static::lazy_static! {
170170
*REGISTRY
171171
).unwrap();
172172

173+
pub static ref LOOP_COMMIT_DURATION: HistogramVec = register_histogram_vec_with_registry!(
174+
"gasoline_loop_commit_duration",
175+
"Total duration of a single loop commit.",
176+
&["workflow_name"],
177+
BUCKETS.to_vec(),
178+
*REGISTRY
179+
).unwrap();
180+
173181
pub static ref LOOP_ITERATION_DURATION: HistogramVec = register_histogram_vec_with_registry!(
174182
"gasoline_loop_iteration_duration",
175-
"Total duration of a single loop iteration (excluding its body).",
183+
"Total duration of a single loop iteration.",
176184
&["workflow_name"],
177185
BUCKETS.to_vec(),
178186
*REGISTRY

engine/packages/guard-core/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ lazy_static! {
1818
"Number of active in-flight counters",
1919
*REGISTRY
2020
).unwrap();
21+
pub static ref IN_FLIGHT_REQUEST_COUNT: IntGauge = register_int_gauge_with_registry!(
22+
"guard_in_flight_request_count",
23+
"Number of active in-flight requests",
24+
*REGISTRY
25+
).unwrap();
2126

2227
// MARK: TCP
2328
pub static ref TCP_CONNECTION_TOTAL: IntCounter = register_int_counter_with_registry!(

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 13 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ use serde_json;
1515

1616
use rivet_runner_protocol as protocol;
1717
use std::{
18-
borrow::Cow,
19-
collections::HashMap as StdHashMap,
2018
net::SocketAddr,
2119
sync::Arc,
2220
time::{Duration, Instant},
@@ -134,44 +132,13 @@ pub struct RouteConfig {
134132
pub enum RoutingOutput {
135133
/// Return the data to route to.
136134
Route(RouteConfig),
137-
/// Return a custom response.
138-
Response(StructuredResponse),
139135
/// Return a custom serve handler.
140136
CustomServe(Arc<dyn CustomServeTrait>),
141137
}
142138

143-
#[derive(Clone, Debug)]
144-
pub struct StructuredResponse {
145-
pub status: StatusCode,
146-
pub message: Cow<'static, str>,
147-
pub docs: Option<Cow<'static, str>>,
148-
}
149-
150-
impl StructuredResponse {
151-
pub fn build_response(&self) -> Result<Response<ResponseBody>> {
152-
let mut body = StdHashMap::new();
153-
body.insert("message", self.message.clone().into_owned());
154-
155-
if let Some(docs) = &self.docs {
156-
body.insert("docs", docs.clone().into_owned());
157-
}
158-
159-
let body_json = serde_json::to_string(&body)?;
160-
let bytes = Bytes::from(body_json);
161-
162-
let response = Response::builder()
163-
.status(self.status)
164-
.header(hyper::header::CONTENT_TYPE, "application/json")
165-
.body(ResponseBody::Full(Full::new(bytes)))?;
166-
167-
Ok(response)
168-
}
169-
}
170-
171139
#[derive(Clone)]
172140
enum ResolveRouteOutput {
173141
Target(RouteTarget),
174-
Response(StructuredResponse),
175142
CustomServe(Arc<dyn CustomServeTrait>),
176143
}
177144

@@ -347,6 +314,7 @@ pub struct ProxyState {
347314
cache_key_fn: CacheKeyFn,
348315
middleware_fn: MiddlewareFn,
349316
route_cache: RouteCache,
317+
// We use moka::Cache instead of scc::HashMap because it automatically handles TTL and capacity
350318
rate_limiters: Cache<(Id, std::net::IpAddr), Arc<Mutex<RateLimiter>>>,
351319
in_flight_counters: Cache<(Id, std::net::IpAddr), Arc<Mutex<InFlightCounter>>>,
352320
in_flight_requests: Cache<protocol::RequestId, ()>,
@@ -478,15 +446,6 @@ impl ProxyState {
478446
Err(errors::NoRouteTargets.build())
479447
}
480448
}
481-
RoutingOutput::Response(response) => {
482-
tracing::debug!(
483-
hostname = %hostname_only,
484-
path = %path,
485-
status = ?response.status,
486-
"Routing returned custom response"
487-
);
488-
Ok(ResolveRouteOutput::Response(response))
489-
}
490449
RoutingOutput::CustomServe(handler) => {
491450
tracing::debug!(
492451
hostname = %hostname_only,
@@ -660,6 +619,7 @@ impl ProxyState {
660619

661620
// Release request ID
662621
self.in_flight_requests.invalidate(&request_id).await;
622+
metrics::IN_FLIGHT_REQUEST_COUNT.set(self.in_flight_requests.entry_count() as i64);
663623
}
664624

665625
/// Generate a unique request ID that is not currently in flight
@@ -668,11 +628,19 @@ impl ProxyState {
668628

669629
for attempt in 0..MAX_TRIES {
670630
let request_id = protocol::util::generate_request_id();
631+
let mut inserted = false;
671632

672633
// Check if this ID is already in use
673-
if self.in_flight_requests.get(&request_id).await.is_none() {
674-
// Insert the ID and return it
675-
self.in_flight_requests.insert(request_id, ()).await;
634+
self.in_flight_requests
635+
.entry(request_id)
636+
.or_insert_with(async {
637+
inserted = true;
638+
})
639+
.await;
640+
641+
if inserted {
642+
metrics::IN_FLIGHT_REQUEST_COUNT.set(self.in_flight_requests.entry_count() as i64);
643+
676644
return Ok(request_id);
677645
}
678646

@@ -769,10 +737,6 @@ impl ProxyService {
769737

770738
// Resolve target
771739
let target = target_res?;
772-
if let ResolveRouteOutput::Response(response) = &target {
773-
// Return the custom response
774-
return response.build_response();
775-
}
776740

777741
let actor_id = if let ResolveRouteOutput::Target(target) = &target {
778742
target.actor_id
@@ -1088,9 +1052,6 @@ impl ProxyService {
10881052
}
10891053
.build());
10901054
}
1091-
ResolveRouteOutput::Response(_) => {
1092-
unreachable!()
1093-
}
10941055
ResolveRouteOutput::CustomServe(mut handler) => {
10951056
let req_headers = req.headers().clone();
10961057
let req_method = req.method().clone();
@@ -1554,20 +1515,6 @@ impl ProxyService {
15541515
Ok(ResolveRouteOutput::Target(new_target)) => {
15551516
target = new_target;
15561517
}
1557-
Ok(ResolveRouteOutput::Response(response)) => {
1558-
tracing::debug!(
1559-
status=?response.status,
1560-
message=?response.message,
1561-
docs=?response.docs,
1562-
"got response instead of websocket target",
1563-
);
1564-
1565-
// Close the WebSocket connection with the response message
1566-
let _ = client_ws
1567-
.close(Some(str_to_close_frame(response.message.as_ref())))
1568-
.await;
1569-
return;
1570-
}
15711518
Ok(ResolveRouteOutput::CustomServe(_)) => {
15721519
let err = errors::WebSocketTargetChanged.build();
15731520
tracing::warn!(
@@ -1907,7 +1854,6 @@ impl ProxyService {
19071854
.instrument(tracing::info_span!("handle_ws_task_target")),
19081855
);
19091856
}
1910-
ResolveRouteOutput::Response(_) => unreachable!(),
19111857
ResolveRouteOutput::CustomServe(mut handler) => {
19121858
tracing::debug!(%req_path, "Spawning task to handle WebSocket communication");
19131859
let mut request_context = request_context.clone();
@@ -2090,19 +2036,6 @@ impl ProxyService {
20902036
handler = new_handler;
20912037
continue;
20922038
}
2093-
Ok(ResolveRouteOutput::Response(response)) => {
2094-
ws_handle
2095-
.send(to_hyper_close(Some(str_to_close_frame(
2096-
response.message.as_ref(),
2097-
))))
2098-
.await?;
2099-
2100-
// Flush to ensure close frame is sent
2101-
ws_handle.flush().await?;
2102-
2103-
// Keep TCP connection open briefly to allow client to process close
2104-
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
2105-
}
21062039
Ok(ResolveRouteOutput::Target(_)) => {
21072040
let err = errors::WebSocketTargetChanged.build();
21082041
tracing::warn!(
@@ -2666,16 +2599,6 @@ pub fn is_ws_hibernate(err: &anyhow::Error) -> bool {
26662599
}
26672600
}
26682601

2669-
fn str_to_close_frame(err: &str) -> CloseFrame {
2670-
// NOTE: reason cannot be more than 123 bytes as per the WS protocol spec
2671-
let reason = rivet_util::safe_slice(err, 0, 123).into();
2672-
2673-
CloseFrame {
2674-
code: CloseCode::Error,
2675-
reason,
2676-
}
2677-
}
2678-
26792602
fn err_to_close_frame(err: anyhow::Error, ray_id: Option<Id>) -> CloseFrame {
26802603
let rivet_err = err
26812604
.chain()

0 commit comments

Comments
 (0)