diff --git a/hud/eval/context.py b/hud/eval/context.py index 16cbea38..3f05bd8f 100644 --- a/hud/eval/context.py +++ b/hud/eval/context.py @@ -18,7 +18,8 @@ from hud.environment import Environment from hud.settings import settings from hud.shared import make_request -from hud.telemetry import flush, instrument +from hud.telemetry import flush, instrument, queue_span +from hud.telemetry.instrument import _normalize_trace_id, _now_iso if TYPE_CHECKING: from collections.abc import Generator @@ -388,28 +389,117 @@ def from_task( return ctx + def _emit_scenario_span( + self, + name: str, + status: str, + scenario_name: str, + start_time: str, + end_time: str | None = None, + result: Any = None, + error: str | None = None, + ) -> None: + """Emit a scenario lifecycle span for real-time stage visibility.""" + if not self._trace_enabled: + return + + span = { + "name": name, + "trace_id": _normalize_trace_id(self.trace_id), + "span_id": uuid.uuid4().hex[:16], + "parent_span_id": None, + "start_time": start_time, + "end_time": end_time or start_time, + "status_code": "ERROR" if error else "OK", + "status_message": error, + "attributes": { + "task_run_id": self.trace_id, + "category": "scenario", + "type": "CLIENT", + "scenario_name": scenario_name, + "status": status, + "result": result, + }, + "internal_type": f"scenario-{name.split('_')[-1]}", + } + queue_span(span) + async def _run_task_scenario_setup(self) -> None: """Run the task's scenario setup phase (if scenario provided).""" if self._task is None or self._task.scenario is None: return - prompt = await self.run_scenario_setup(self._task.scenario, self._task.args or {}) - if prompt: - self.prompt = prompt + scenario_name = self._task.scenario + start_time = _now_iso() + + self._emit_scenario_span( + "scenario_setup", + "started", + scenario_name, + start_time, + ) + + try: + prompt = await self.run_scenario_setup(scenario_name, self._task.args or {}) + if prompt: + self.prompt = prompt + + self._emit_scenario_span( + "scenario_setup", + "completed", + scenario_name, + start_time, + _now_iso(), + ) + except Exception as e: + self._emit_scenario_span( + "scenario_setup", + "error", + scenario_name, + start_time, + _now_iso(), + error=str(e), + ) + raise async def _run_task_scenario_evaluate(self) -> None: """Run the task's scenario evaluate phase (if scenario provided).""" if self._task is None or self._task.scenario is None: return + scenario_name = self._task.scenario + start_time = _now_iso() + + self._emit_scenario_span( + "scenario_evaluate", + "started", + scenario_name, + start_time, + ) + try: - result = await self.run_scenario_evaluate(self._task.scenario) + result = await self.run_scenario_evaluate(scenario_name) + self.evaluation_result = result + self.reward = result.reward + + self._emit_scenario_span( + "scenario_evaluate", + "completed", + scenario_name, + start_time, + _now_iso(), + result={"reward": result.reward}, + ) except Exception as e: self.error = e - return - - self.evaluation_result = result - self.reward = result.reward + self._emit_scenario_span( + "scenario_evaluate", + "error", + scenario_name, + start_time, + _now_iso(), + error=str(e), + ) # ========================================================================= # Summary Context - Attribute Access Control