Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 113 additions & 26 deletions backend/app/graph/nodes/tasting_notes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from app.graph.schemas import TastingNoteOutput, TechniqueResult
from app.providers.llm import build_llm, extract_text_content
from app.providers.llm_policy import invoke_with_policy, RetryConfig
from app.services.event_channel import create_sommelier_event, get_event_channel
from app.services.llm_context import render_repo_context, get_context_budget
from app.techniques.mappings import (
TastingNote,
Expand Down Expand Up @@ -98,6 +99,25 @@ async def evaluate(
self, state: EvaluationState, config: Optional[RunnableConfig] = None
) -> Dict[str, Any]:
started_at = datetime.now(timezone.utc).isoformat()
evaluation_id = state.get("evaluation_id", "")
category_id = self.category.value
progress_config = TASTING_NOTE_PROGRESS.get(
category_id, {"start": 0, "complete": 100}
)
event_channel = get_event_channel()

if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_start",
progress_percent=progress_config["start"],
message=f"{category_id} analysis starting...",
),
)
Comment on lines +102 to +119

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The introduction of event streaming based on evaluation_id creates a potential Insecure Direct Object Reference (IDOR) vulnerability. The evaluation_id is used to identify the event channel, but the corresponding subscription mechanism (likely in the API endpoint that calls event_channel.subscribe) may not be performing an authorization check to ensure the subscriber owns the evaluation. If an attacker can guess or obtain another user's evaluation_id, they could subscribe to the event stream and access potentially sensitive, real-time analysis data of a private repository. The EventChannel service itself does not seem to enforce any ownership checks, placing the full burden of security on the calling API endpoint.


configurable = (config or {}).get("configurable", {})
provider = configurable.get("provider", "vertex")
api_key = configurable.get("api_key")
Expand All @@ -117,15 +137,26 @@ async def evaluate(
techniques = self.get_techniques()[:3]

if not techniques:
logger.warning(f"{self.category.value}: no techniques available")
logger.warning(f"{category_id}: no techniques available")
if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_error",
progress_percent=progress_config["start"],
message=f"{category_id}: no techniques configured",
),
)
return {
"errors": [f"{self.category.value}: no techniques configured"],
f"{self.category.value}_result": None,
"completed_sommeliers": [self.category.value],
"token_usage": {self.category.value: {}},
"cost_usage": {self.category.value: None},
"errors": [f"{category_id}: no techniques configured"],
f"{category_id}_result": None,
"completed_sommeliers": [category_id],
"token_usage": {category_id: {}},
"cost_usage": {category_id: None},
"trace_metadata": {
self.category.value: {
category_id: {
"started_at": started_at,
"completed_at": datetime.now(timezone.utc).isoformat(),
}
Expand Down Expand Up @@ -194,11 +225,11 @@ async def evaluate(
rendered_context += code_section

observability = {
"completed_sommeliers": [self.category.value],
"token_usage": {self.category.value: {}},
"cost_usage": {self.category.value: None},
"completed_sommeliers": [category_id],
"token_usage": {category_id: {}},
"cost_usage": {category_id: None},
"trace_metadata": {
self.category.value: {
category_id: {
"started_at": started_at,
"completed_at": None,
"model": model or "default",
Expand All @@ -214,7 +245,18 @@ async def evaluate(
messages = prompt.format_messages(repo_context=rendered_context)

def on_retry(attempt: int, delay: float, msg: str) -> None:
logger.info(f"{self.category.value}: {msg}")
logger.info(f"{category_id}: {msg}")
if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_retry",
progress_percent=progress_config["start"],
message=f"{category_id} retrying ({attempt}/3)...",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The retry message hardcodes the maximum number of attempts as 3. This value is also configured in RetryConfig in the invoke_with_policy call. If max_attempts in RetryConfig is changed, this message will become misleading and will need to be updated manually.

To improve maintainability, consider defining max_attempts in a variable that can be used in both places to keep them in sync.

),
)

invocation_result = await invoke_with_policy(
llm=llm,
Expand All @@ -225,21 +267,21 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:
on_retry=on_retry,
)

observability["trace_metadata"][self.category.value]["completed_at"] = (
datetime.now(timezone.utc).isoformat()
)
observability["trace_metadata"][self.category.value]["attempts"] = (
observability["trace_metadata"][category_id]["completed_at"] = datetime.now(
timezone.utc
).isoformat()
observability["trace_metadata"][category_id]["attempts"] = (
invocation_result.attempts
)
observability["trace_metadata"][self.category.value]["total_wait_seconds"] = (
observability["trace_metadata"][category_id]["total_wait_seconds"] = (
invocation_result.total_wait_seconds
)

if invocation_result.success:
response = invocation_result.response
usage = getattr(response, "usage_metadata", {}) or {}
observability["token_usage"] = {
self.category.value: {
category_id: {
"input_tokens": usage.get("input_tokens"),
"output_tokens": usage.get("output_tokens"),
"total_tokens": usage.get("total_tokens"),
Expand All @@ -250,23 +292,56 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:
text_content = extract_text_content(response.content)
result = self.parser.parse(text_content)
except Exception as parse_error:
logger.error(
f"{self.category.value} failed to parse response: {parse_error!s}"
)
logger.error(f"{category_id} failed to parse response: {parse_error!s}")
if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_error",
progress_percent=progress_config["start"],
message=f"{category_id} analysis failed (parse error)",
),
)
return {
"errors": [f"{self.category.value} parse error: {parse_error!s}"],
f"{self.category.value}_result": None,
"errors": [f"{category_id} parse error: {parse_error!s}"],
f"{category_id}_result": None,
**observability,
}

if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_complete",
progress_percent=progress_config["complete"],
message=f"{category_id} analysis complete",
tokens_used=usage.get("total_tokens", 0),
),
)
if category_id == "cellar":
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier="system",
event_type="evaluation_complete",
progress_percent=100,
message="Grand tasting evaluation complete",
),
)

return {
f"{self.category.value}_result": result.model_dump(),
f"{category_id}_result": result.model_dump(),
**observability,
}

error_category = invocation_result.error_category
error_msg = (
f"{self.category.value} evaluation failed after "
f"{category_id} evaluation failed after "
f"{invocation_result.attempts} attempts"
)
if error_category:
Expand All @@ -276,8 +351,20 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:

logger.error(error_msg)

if evaluation_id:
event_channel.emit_sync(
evaluation_id,
create_sommelier_event(
evaluation_id=evaluation_id,
sommelier=category_id,
event_type="sommelier_error",
progress_percent=progress_config["start"],
message=f"{category_id} analysis failed",
),
)

return {
"errors": [error_msg],
f"{self.category.value}_result": None,
f"{category_id}_result": None,
**observability,
}
28 changes: 23 additions & 5 deletions frontend/src/hooks/useEvaluationStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,18 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
setCurrentSommelier(event.sommelier);
}
setStatus('processing');
setCompletedSommeliers((prev) => {
updateProgressFromCompleted(prev.length, true);
return prev;
});
if (event.progress_percent != null && event.progress_percent >= 0) {
const newProgress = event.progress_percent;
if (newProgress > progressRef.current) {
progressRef.current = newProgress;
setProgress(newProgress);
}
} else {
setCompletedSommeliers((prev) => {
updateProgressFromCompleted(prev.length, true);
return prev;
});
}
break;

case 'sommelier_complete':
Expand All @@ -108,7 +116,15 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
feedback: event.message || `${sommelierInfo.name} analysis complete`,
},
];
updateProgressFromCompleted(newList.length, false);
if (event.progress_percent != null && event.progress_percent >= 0) {
const newProgress = event.progress_percent;
if (newProgress > progressRef.current) {
progressRef.current = newProgress;
setProgress(newProgress);
}
} else {
updateProgressFromCompleted(newList.length, false);
}
return newList;
});
setCurrentSommelier(null);
Expand All @@ -128,6 +144,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
break;

case 'evaluation_complete':
isCompleteRef.current = true;
setIsComplete(true);
setStatus('completed');
progressRef.current = 100;
Expand All @@ -136,6 +153,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
break;

case 'evaluation_error':
isCompleteRef.current = true;
setIsComplete(true);
setStatus('failed');
setCurrentSommelier(null);
Expand Down