From 21c5b05caa67f422efa69f85bdb0d352c87ad7f3 Mon Sep 17 00:00:00 2001 From: ComBba Date: Tue, 10 Feb 2026 09:58:09 +0900 Subject: [PATCH] fix: add SSE event emission to grand_tasting mode tasting notes Backend: - Add sommelier_start/complete/error/retry event emission to BaseTastingNoteNode - Add evaluation_complete event emission when cellar node completes - Use TASTING_NOTE_PROGRESS dict for progress percentages Frontend: - Use event.progress_percent directly when available (more accurate than calculated) - Fix reconnection race by setting isCompleteRef.current immediately in same tick - Prevent 'Reconnecting 1/5' popup after evaluation completes --- backend/app/graph/nodes/tasting_notes/base.py | 139 ++++++++++++++---- frontend/src/hooks/useEvaluationStream.ts | 28 +++- 2 files changed, 136 insertions(+), 31 deletions(-) diff --git a/backend/app/graph/nodes/tasting_notes/base.py b/backend/app/graph/nodes/tasting_notes/base.py index de1ea9e..f460655 100644 --- a/backend/app/graph/nodes/tasting_notes/base.py +++ b/backend/app/graph/nodes/tasting_notes/base.py @@ -10,6 +10,7 @@ from app.graph.schemas import TastingNoteOutput, TechniqueResult from app.providers.llm import build_llm, extract_text_content from app.providers.llm_policy import invoke_with_policy, RetryConfig +from app.services.event_channel import create_sommelier_event, get_event_channel from app.services.llm_context import render_repo_context, get_context_budget from app.techniques.mappings import ( TastingNote, @@ -98,6 +99,25 @@ async def evaluate( self, state: EvaluationState, config: Optional[RunnableConfig] = None ) -> Dict[str, Any]: started_at = datetime.now(timezone.utc).isoformat() + evaluation_id = state.get("evaluation_id", "") + category_id = self.category.value + progress_config = TASTING_NOTE_PROGRESS.get( + category_id, {"start": 0, "complete": 100} + ) + event_channel = get_event_channel() + + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_start", + progress_percent=progress_config["start"], + message=f"{category_id} analysis starting...", + ), + ) + configurable = (config or {}).get("configurable", {}) provider = configurable.get("provider", "vertex") api_key = configurable.get("api_key") @@ -117,15 +137,26 @@ async def evaluate( techniques = self.get_techniques()[:3] if not techniques: - logger.warning(f"{self.category.value}: no techniques available") + logger.warning(f"{category_id}: no techniques available") + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_error", + progress_percent=progress_config["start"], + message=f"{category_id}: no techniques configured", + ), + ) return { - "errors": [f"{self.category.value}: no techniques configured"], - f"{self.category.value}_result": None, - "completed_sommeliers": [self.category.value], - "token_usage": {self.category.value: {}}, - "cost_usage": {self.category.value: None}, + "errors": [f"{category_id}: no techniques configured"], + f"{category_id}_result": None, + "completed_sommeliers": [category_id], + "token_usage": {category_id: {}}, + "cost_usage": {category_id: None}, "trace_metadata": { - self.category.value: { + category_id: { "started_at": started_at, "completed_at": datetime.now(timezone.utc).isoformat(), } @@ -194,11 +225,11 @@ async def evaluate( rendered_context += code_section observability = { - "completed_sommeliers": [self.category.value], - "token_usage": {self.category.value: {}}, - "cost_usage": {self.category.value: None}, + "completed_sommeliers": [category_id], + "token_usage": {category_id: {}}, + "cost_usage": {category_id: None}, "trace_metadata": { - self.category.value: { + category_id: { "started_at": started_at, "completed_at": None, "model": model or "default", @@ -214,7 +245,18 @@ async def evaluate( messages = prompt.format_messages(repo_context=rendered_context) def on_retry(attempt: int, delay: float, msg: str) -> None: - logger.info(f"{self.category.value}: {msg}") + logger.info(f"{category_id}: {msg}") + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_retry", + progress_percent=progress_config["start"], + message=f"{category_id} retrying ({attempt}/3)...", + ), + ) invocation_result = await invoke_with_policy( llm=llm, @@ -225,13 +267,13 @@ def on_retry(attempt: int, delay: float, msg: str) -> None: on_retry=on_retry, ) - observability["trace_metadata"][self.category.value]["completed_at"] = ( - datetime.now(timezone.utc).isoformat() - ) - observability["trace_metadata"][self.category.value]["attempts"] = ( + observability["trace_metadata"][category_id]["completed_at"] = datetime.now( + timezone.utc + ).isoformat() + observability["trace_metadata"][category_id]["attempts"] = ( invocation_result.attempts ) - observability["trace_metadata"][self.category.value]["total_wait_seconds"] = ( + observability["trace_metadata"][category_id]["total_wait_seconds"] = ( invocation_result.total_wait_seconds ) @@ -239,7 +281,7 @@ def on_retry(attempt: int, delay: float, msg: str) -> None: response = invocation_result.response usage = getattr(response, "usage_metadata", {}) or {} observability["token_usage"] = { - self.category.value: { + category_id: { "input_tokens": usage.get("input_tokens"), "output_tokens": usage.get("output_tokens"), "total_tokens": usage.get("total_tokens"), @@ -250,23 +292,56 @@ def on_retry(attempt: int, delay: float, msg: str) -> None: text_content = extract_text_content(response.content) result = self.parser.parse(text_content) except Exception as parse_error: - logger.error( - f"{self.category.value} failed to parse response: {parse_error!s}" - ) + logger.error(f"{category_id} failed to parse response: {parse_error!s}") + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_error", + progress_percent=progress_config["start"], + message=f"{category_id} analysis failed (parse error)", + ), + ) return { - "errors": [f"{self.category.value} parse error: {parse_error!s}"], - f"{self.category.value}_result": None, + "errors": [f"{category_id} parse error: {parse_error!s}"], + f"{category_id}_result": None, **observability, } + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_complete", + progress_percent=progress_config["complete"], + message=f"{category_id} analysis complete", + tokens_used=usage.get("total_tokens", 0), + ), + ) + if category_id == "cellar": + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier="system", + event_type="evaluation_complete", + progress_percent=100, + message="Grand tasting evaluation complete", + ), + ) + return { - f"{self.category.value}_result": result.model_dump(), + f"{category_id}_result": result.model_dump(), **observability, } error_category = invocation_result.error_category error_msg = ( - f"{self.category.value} evaluation failed after " + f"{category_id} evaluation failed after " f"{invocation_result.attempts} attempts" ) if error_category: @@ -276,8 +351,20 @@ def on_retry(attempt: int, delay: float, msg: str) -> None: logger.error(error_msg) + if evaluation_id: + event_channel.emit_sync( + evaluation_id, + create_sommelier_event( + evaluation_id=evaluation_id, + sommelier=category_id, + event_type="sommelier_error", + progress_percent=progress_config["start"], + message=f"{category_id} analysis failed", + ), + ) + return { "errors": [error_msg], - f"{self.category.value}_result": None, + f"{category_id}_result": None, **observability, } diff --git a/frontend/src/hooks/useEvaluationStream.ts b/frontend/src/hooks/useEvaluationStream.ts index f8ca138..cc500d9 100644 --- a/frontend/src/hooks/useEvaluationStream.ts +++ b/frontend/src/hooks/useEvaluationStream.ts @@ -83,10 +83,18 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe setCurrentSommelier(event.sommelier); } setStatus('processing'); - setCompletedSommeliers((prev) => { - updateProgressFromCompleted(prev.length, true); - return prev; - }); + if (event.progress_percent != null && event.progress_percent >= 0) { + const newProgress = event.progress_percent; + if (newProgress > progressRef.current) { + progressRef.current = newProgress; + setProgress(newProgress); + } + } else { + setCompletedSommeliers((prev) => { + updateProgressFromCompleted(prev.length, true); + return prev; + }); + } break; case 'sommelier_complete': @@ -108,7 +116,15 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe feedback: event.message || `${sommelierInfo.name} analysis complete`, }, ]; - updateProgressFromCompleted(newList.length, false); + if (event.progress_percent != null && event.progress_percent >= 0) { + const newProgress = event.progress_percent; + if (newProgress > progressRef.current) { + progressRef.current = newProgress; + setProgress(newProgress); + } + } else { + updateProgressFromCompleted(newList.length, false); + } return newList; }); setCurrentSommelier(null); @@ -128,6 +144,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe break; case 'evaluation_complete': + isCompleteRef.current = true; setIsComplete(true); setStatus('completed'); progressRef.current = 100; @@ -136,6 +153,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe break; case 'evaluation_error': + isCompleteRef.current = true; setIsComplete(true); setStatus('failed'); setCurrentSommelier(null);