Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions alphatrion/server/graphql/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ArtifactTag,
CreateTeamInput,
CreateUserInput,
DailyTokenUsage,
Experiment,
GraphQLExperimentType,
GraphQLExperimentTypeEnum,
Expand Down Expand Up @@ -572,6 +573,7 @@ def list_spans(run_id: strawberry.ID) -> list[Span]:
parent_span_id=t["ParentSpanId"],
span_name=t["SpanName"],
span_kind=t["SpanKind"],
semantic_kind=t["SemanticKind"],
service_name=t["ServiceName"],
duration=t["Duration"],
status_code=t["StatusCode"],
Expand All @@ -593,6 +595,42 @@ def list_spans(run_id: strawberry.ID) -> list[Span]:
print(f"Failed to fetch traces: {e}")
return []

# Alias for list_spans
list_traces = list_spans

@staticmethod
def get_daily_token_usage(
team_id: strawberry.ID, days: int = 7
) -> list[DailyTokenUsage]:
"""Get daily token usage from LLM calls for a team."""
from alphatrion import envs

# Check if tracing is enabled
if os.getenv(envs.ENABLE_TRACING, "false").lower() != "true":
return []

try:
trace_store = runtime.storage_runtime().tracestore
daily_usage = trace_store.get_daily_token_usage(
team_id=uuid.UUID(team_id), days=days
)
trace_store.close()

# Convert to GraphQL DailyTokenUsage objects
return [
DailyTokenUsage(
date=item["date"],
total_tokens=item["total_tokens"],
input_tokens=item["input_tokens"],
output_tokens=item["output_tokens"],
)
for item in daily_usage
]
except Exception as e:
# Log error and return empty list - don't fail the GraphQL query
print(f"Failed to fetch daily token usage: {e}")
return []


class GraphQLMutations:
@staticmethod
Expand Down
7 changes: 7 additions & 0 deletions alphatrion/server/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ArtifactTag,
CreateTeamInput,
CreateUserInput,
DailyTokenUsage,
Experiment,
Project,
RemoveUserFromTeamInput,
Expand Down Expand Up @@ -90,6 +91,12 @@ def runs(
def traces(self, run_id: strawberry.ID) -> list[Span]:
return GraphQLResolvers.list_traces(run_id=run_id)

@strawberry.field
def daily_token_usage(
self, team_id: strawberry.ID, days: int = 7
) -> list[DailyTokenUsage]:
return GraphQLResolvers.get_daily_token_usage(team_id=team_id, days=days)

# Artifact queries
@strawberry.field
async def artifact_repos(self) -> list[ArtifactRepository]:
Expand Down
9 changes: 9 additions & 0 deletions alphatrion/server/graphql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class Span:
parent_span_id: str
span_name: str
span_kind: str
semantic_kind: str
service_name: str
duration: float # nanoseconds (using float to support large int64 values)
status_code: str
Expand All @@ -247,3 +248,11 @@ class Span:
resource_attributes: JSON
events: list[TraceEvent]
links: list[TraceLink]


@strawberry.type
class DailyTokenUsage:
date: str
total_tokens: int
input_tokens: int
output_tokens: int
82 changes: 66 additions & 16 deletions alphatrion/storage/tracestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def _create_tables(self) -> None:
ParentSpanId String CODEC(ZSTD(1)),
SpanName LowCardinality(String) CODEC(ZSTD(1)),
SpanKind LowCardinality(String) CODEC(ZSTD(1)),
SemanticKind LowCardinality(String) CODEC(ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Duration UInt64 CODEC(ZSTD(1)),
StatusCode LowCardinality(String) CODEC(ZSTD(1)),
Expand All @@ -104,6 +105,7 @@ def _create_tables(self) -> None:
INDEX idx_run_id RunId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_project_id ProjectId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_team_id TeamId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_semantic_kind SemanticKind TYPE set(0) GRANULARITY 1,
INDEX idx_attr_keys mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1
) ENGINE = MergeTree()
PARTITION BY toDate(Timestamp)
Expand Down Expand Up @@ -140,6 +142,7 @@ def insert_spans(self, spans: list[dict[str, Any]]) -> None:
span.get("ParentSpanId", ""),
span.get("SpanName", ""),
span.get("SpanKind", ""),
span.get("SemanticKind", ""),
span.get("ServiceName", ""),
span.get("Duration", 0),
span.get("StatusCode", ""),
Expand Down Expand Up @@ -170,6 +173,7 @@ def insert_spans(self, spans: list[dict[str, Any]]) -> None:
"ParentSpanId",
"SpanName",
"SpanKind",
"SemanticKind",
"ServiceName",
"Duration",
"StatusCode",
Expand Down Expand Up @@ -212,6 +216,7 @@ def get_spans_by_run_id(self, run_id: uuid.UUID) -> list[dict[str, Any]]:
ParentSpanId,
SpanName,
SpanKind,
SemanticKind,
ServiceName,
Duration,
StatusCode,
Expand Down Expand Up @@ -244,25 +249,26 @@ def get_spans_by_run_id(self, run_id: uuid.UUID) -> list[dict[str, Any]]:
"ParentSpanId": row[3],
"SpanName": row[4],
"SpanKind": row[5],
"ServiceName": row[6],
"Duration": row[7],
"StatusCode": row[8],
"StatusMessage": row[9],
"TeamId": row[10],
"ProjectId": row[11],
"RunId": row[12],
"ExperimentId": row[13],
"SpanAttributes": row[14],
"ResourceAttributes": row[15],
"SemanticKind": row[6],
"ServiceName": row[7],
"Duration": row[8],
"StatusCode": row[9],
"StatusMessage": row[10],
"TeamId": row[11],
"ProjectId": row[12],
"RunId": row[13],
"ExperimentId": row[14],
"SpanAttributes": row[15],
"ResourceAttributes": row[16],
"Events": {
"Timestamp": row[16],
"Name": row[17],
"Attributes": row[18],
"Timestamp": row[17],
"Name": row[18],
"Attributes": row[19],
},
"Links": {
"TraceId": row[19],
"SpanId": row[20],
"Attributes": row[21],
"TraceId": row[20],
"SpanId": row[21],
"Attributes": row[22],
},
}
)
Expand All @@ -271,6 +277,50 @@ def get_spans_by_run_id(self, run_id: uuid.UUID) -> list[dict[str, Any]]:
logger.error(f"Failed to get traces by run_id: {e}")
return []

def get_daily_token_usage(
self, team_id: uuid.UUID, days: int = 30
) -> list[dict[str, Any]]:
"""Get daily token usage from LLM calls for a team.

Args:
team_id: The team ID to filter by
days: Number of days to look back (default: 30)

Returns:
List of dicts with keys: date, total_tokens, input_tokens, output_tokens
"""
with self._lock:
try:
query = f"""
SELECT
toDate(Timestamp) as date,
SUM(toInt64OrZero(SpanAttributes['llm.usage.total_tokens'])) as total_tokens,
SUM(toInt64OrZero(SpanAttributes['gen_ai.usage.input_tokens'])) as input_tokens,
SUM(toInt64OrZero(SpanAttributes['gen_ai.usage.output_tokens'])) as output_tokens
FROM {self.database}.otel_spans
WHERE TeamId = '{team_id}'
AND Timestamp >= now() - INTERVAL {days} DAY
AND SemanticKind = 'llm'
GROUP BY date
ORDER BY date ASC
"""

result = self.client.query(query)
daily_usage = []
for row in result.result_rows:
daily_usage.append(
{
"date": row[0].strftime("%Y-%m-%d"),
"total_tokens": int(row[1]),
"input_tokens": int(row[2]),
"output_tokens": int(row[3]),
}
)
return daily_usage
except Exception as e:
logger.error(f"Failed to get daily token usage: {e}")
return []

def close(self) -> None:
"""Close the ClickHouse connection."""
try:
Expand Down
11 changes: 11 additions & 0 deletions alphatrion/tracing/clickhouse_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def _convert_span(self, span: ReadableSpan) -> dict[str, Any]:
run_id = span_attributes.get("run_id", "")
experiment_id = span_attributes.get("experiment_id", "")

# Determine semantic kind (application-level span type)
# Priority: LLM calls > Traceloop span kind > unknown
if "llm.usage.total_tokens" in span_attributes:
semantic_kind = "llm"
elif "traceloop.span.kind" in span_attributes:
# Values: "workflow", "task", "agent", "tool", "unknown"
semantic_kind = span_attributes["traceloop.span.kind"]
else:
raise ValueError("Span is missing required semantic kind attributes")

# Convert events to nested structure
event_timestamps = []
event_names = []
Expand Down Expand Up @@ -151,6 +161,7 @@ def _convert_span(self, span: ReadableSpan) -> dict[str, Any]:
"ParentSpanId": parent_span_id,
"SpanName": span.name,
"SpanKind": span_kind,
"SemanticKind": semantic_kind,
"ServiceName": service_name,
"Duration": duration,
"StatusCode": status_code,
Expand Down
Loading
Loading