From fd7856fe2dede29d4c6f8bd6fa840878304ba53b Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 31 Dec 2025 21:06:52 -0800 Subject: [PATCH] feat: Add GenAI/LLM telemetry support for ClickHouse - Add --materialize-genai-fields flag to DDL tool for creating materialized columns (GenAIOperationName, GenAIProviderName, token counts, etc.) - Add --partition-by-service-name flag for multi-tenant optimization - Add genai_redaction_processor.py for PII redaction in LLM messages - Update documentation with usage examples --- .gitignore | 3 +- .../processors/genai_redaction_processor.py | 463 ++++++++++++++++++ .../genai_redaction_processor_test.py | 453 +++++++++++++++++ src/bin/clickhouse-ddl/README.md | 51 +- src/bin/clickhouse-ddl/ddl_traces.rs | 101 +++- src/bin/clickhouse-ddl/main.rs | 11 +- src/exporters/clickhouse/mod.rs | 8 +- src/init/args.rs | 2 + 8 files changed, 1062 insertions(+), 30 deletions(-) create mode 100644 rotel_python_processor_sdk/processors/genai_redaction_processor.py create mode 100644 rotel_python_processor_sdk/python_tests/genai_redaction_processor_test.py diff --git a/.gitignore b/.gitignore index 5506ec0f..35b4528c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,8 @@ target/ rotel_python_processor_sdk/.env/ rotel_python_processor_sdk/rotel_sdk/*.so rotel_python_processor_sdk/.idea -rotel_python_processor_sdk/processors/__pycache__ +**/__pycache__/ +*.pyc CLAUDE.md flamegraph.svg perf.data diff --git a/rotel_python_processor_sdk/processors/genai_redaction_processor.py b/rotel_python_processor_sdk/processors/genai_redaction_processor.py new file mode 100644 index 00000000..0e9caf95 --- /dev/null +++ b/rotel_python_processor_sdk/processors/genai_redaction_processor.py @@ -0,0 +1,463 @@ +""" +GenAI-specific redaction processor for PII/sensitive content in LLM message structures. + +This processor targets GenAI semantic convention attributes: +- gen_ai.input.messages +- gen_ai.output.messages + +It parses the message arrays and applies redaction rules to: +- Text content in messages +- Tool call arguments +- Tool response results + +Example configuration: + + config = GenAIRedactionConfig( + # Regex patterns to redact in message content + content_patterns=[ + r'\\b\\d{3}-\\d{2}-\\d{4}\\b', # SSN + r'\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b', # Email + ], + + # Specific fields to always redact in tool calls + redact_tool_fields=["password", "api_key", "secret", "token"], + + # Tool names whose arguments should be fully redacted + redact_tool_arguments=["login", "authenticate", "set_password"], + + # Replacement text for redacted content + replacement="[REDACTED]", + + # Or use hashing + hash_function="sha256", + + # Add summary attributes: "silent", "info", or "debug" + summary="info", + ) + + processor = GenAIRedactionProcessor(config) + processor.process_spans(resource_spans) +""" + +import hashlib +import json +import re +from typing import List, Optional, Set, Dict, Any, Tuple + +from rotel_sdk.open_telemetry.common.v1 import KeyValue, AnyValue +from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans + + +# GenAI semantic convention attribute keys +GENAI_INPUT_MESSAGES = "gen_ai.input.messages" +GENAI_OUTPUT_MESSAGES = "gen_ai.output.messages" + + +class GenAIRedactionConfig: + """Configuration for GenAI-specific content redaction.""" + + def __init__( + self, + # Patterns to redact in text content + content_patterns: List[str] = None, + + # Field names in tool calls/responses to always redact + redact_tool_fields: List[str] = None, + + # Tool names whose entire arguments should be redacted + redact_tool_arguments: List[str] = None, + + # Replacement text (used if hash_function is None) + replacement: str = "[REDACTED]", + + # Hash function for redaction (None = use replacement text) + hash_function: Optional[str] = None, + + # Summary level: "silent", "info", "debug" + summary: str = "info", + + # Whether to redact input messages + redact_input_messages: bool = True, + + # Whether to redact output messages + redact_output_messages: bool = True, + ): + self.content_patterns = [ + re.compile(p, re.IGNORECASE) for p in (content_patterns or []) + ] + self.redact_tool_fields = set(f.lower() for f in (redact_tool_fields or [])) + self.redact_tool_arguments = set(t.lower() for t in (redact_tool_arguments or [])) + self.replacement = replacement + + if hash_function and hash_function not in hashlib.algorithms_available: + raise ValueError( + f"Hash function '{hash_function}' not available. " + f"Available: {hashlib.algorithms_available}" + ) + self.hash_function = hash_function + + if summary not in ("silent", "info", "debug"): + raise ValueError(f"Summary must be 'silent', 'info', or 'debug', got '{summary}'") + self.summary = summary + + self.redact_input_messages = redact_input_messages + self.redact_output_messages = redact_output_messages + + +class RedactionStats: + """Tracks redaction statistics for summary attributes.""" + + def __init__(self): + self.patterns_matched: int = 0 + self.fields_redacted: int = 0 + self.tool_args_redacted: int = 0 + self.messages_processed: int = 0 + + def to_dict(self) -> Dict[str, int]: + return { + "patterns_matched": self.patterns_matched, + "fields_redacted": self.fields_redacted, + "tool_args_redacted": self.tool_args_redacted, + "messages_processed": self.messages_processed, + } + + +class GenAIRedactionProcessor: + """ + Processor that applies PII redaction specifically to GenAI message structures. + + This operates on the raw span attributes before they are transformed into + the GenAI-optimized schema, allowing redaction at the OTEL level. + """ + + def __init__(self, config: GenAIRedactionConfig): + self.config = config + + def _get_redacted_value(self, original: str) -> str: + """Get the redacted replacement for a value.""" + if self.config.hash_function: + hasher = hashlib.new(self.config.hash_function) + hasher.update(original.encode('utf-8')) + return hasher.hexdigest() + return self.config.replacement + + def _redact_text(self, text: str, stats: RedactionStats) -> str: + """Apply content pattern redaction to text.""" + result = text + for pattern in self.config.content_patterns: + matches = pattern.findall(result) + if matches: + stats.patterns_matched += len(matches) + result = pattern.sub( + lambda m: self._get_redacted_value(m.group(0)), + result + ) + return result + + def _redact_dict_fields( + self, + obj: Dict[str, Any], + stats: RedactionStats, + parent_key: str = "" + ) -> Dict[str, Any]: + """Recursively redact sensitive fields in a dictionary.""" + result = {} + for key, value in obj.items(): + full_key = f"{parent_key}.{key}" if parent_key else key + key_lower = key.lower() + + if key_lower in self.config.redact_tool_fields: + # Redact this field entirely + stats.fields_redacted += 1 + result[key] = self._get_redacted_value(str(value)) + elif isinstance(value, str): + # Apply content patterns + result[key] = self._redact_text(value, stats) + elif isinstance(value, dict): + result[key] = self._redact_dict_fields(value, stats, full_key) + elif isinstance(value, list): + result[key] = self._redact_list(value, stats) + else: + result[key] = value + + return result + + def _redact_list(self, items: List[Any], stats: RedactionStats) -> List[Any]: + """Recursively redact items in a list.""" + result = [] + for item in items: + if isinstance(item, str): + result.append(self._redact_text(item, stats)) + elif isinstance(item, dict): + result.append(self._redact_dict_fields(item, stats)) + elif isinstance(item, list): + result.append(self._redact_list(item, stats)) + else: + result.append(item) + return result + + def _redact_message_part( + self, + part: Dict[str, Any], + stats: RedactionStats + ) -> Dict[str, Any]: + """Redact a single message part based on its type.""" + part_type = part.get("type", "") + result = dict(part) + + if part_type == "text": + # Redact text content + if "content" in result: + result["content"] = self._redact_text(result["content"], stats) + + elif part_type == "tool_call": + tool_name = result.get("name", "").lower() + + if tool_name in self.config.redact_tool_arguments: + # Redact entire arguments for sensitive tools + stats.tool_args_redacted += 1 + result["arguments"] = {"[REDACTED]": "Tool arguments redacted"} + elif "arguments" in result and isinstance(result["arguments"], dict): + # Redact specific fields in arguments + result["arguments"] = self._redact_dict_fields( + result["arguments"], stats + ) + + elif part_type == "tool_call_response": + # Redact tool response result + if "result" in result: + if isinstance(result["result"], str): + result["result"] = self._redact_text(result["result"], stats) + elif isinstance(result["result"], dict): + result["result"] = self._redact_dict_fields(result["result"], stats) + + return result + + def _redact_message( + self, + message: Dict[str, Any], + stats: RedactionStats + ) -> Dict[str, Any]: + """Redact a single message (input or output).""" + result = dict(message) + stats.messages_processed += 1 + + if "parts" in result and isinstance(result["parts"], list): + result["parts"] = [ + self._redact_message_part(part, stats) + for part in result["parts"] + if isinstance(part, dict) + ] + + return result + + def _process_json_attribute( + self, + value: str, + processor_fn, + stats: RedactionStats + ) -> str: + """Parse JSON, apply processor, and re-serialize.""" + try: + parsed = json.loads(value) + + if isinstance(parsed, list): + processed = [processor_fn(item, stats) for item in parsed if isinstance(item, dict)] + elif isinstance(parsed, dict): + processed = processor_fn(parsed, stats) + else: + return value + + return json.dumps(processed, ensure_ascii=False) + except json.JSONDecodeError: + # If not valid JSON, apply text redaction directly + return self._redact_text(value, stats) + + def _add_summary_attributes( + self, + attributes: Dict[str, KeyValue], + stats: RedactionStats + ): + """Add summary attributes about redaction performed.""" + if self.config.summary == "silent": + return + + prefix = "genai.redaction" + + if stats.patterns_matched > 0: + attributes[f"{prefix}.patterns_matched"] = KeyValue.new_int_value( + f"{prefix}.patterns_matched", + stats.patterns_matched + ) + + if stats.fields_redacted > 0: + attributes[f"{prefix}.fields_redacted"] = KeyValue.new_int_value( + f"{prefix}.fields_redacted", + stats.fields_redacted + ) + + if stats.tool_args_redacted > 0: + attributes[f"{prefix}.tool_args_redacted"] = KeyValue.new_int_value( + f"{prefix}.tool_args_redacted", + stats.tool_args_redacted + ) + + if self.config.summary == "debug": + attributes[f"{prefix}.messages_processed"] = KeyValue.new_int_value( + f"{prefix}.messages_processed", + stats.messages_processed + ) + + def process_spans(self, resource_spans: ResourceSpans): + """ + Process spans and apply GenAI-specific redaction. + + This modifies the span attributes in place. + """ + for scope_spans in resource_spans.scope_spans: + for span in scope_spans.spans: + self._process_span_attributes(span) + + def _process_messages_value(self, value: Any, stats: RedactionStats) -> Any: + """ + Process a messages value which could be: + - A JSON string containing an array of messages + - A list of message dicts + + Returns the processed value in the same format. + """ + # Handle JSON string + if isinstance(value, str): + try: + parsed = json.loads(value) + if isinstance(parsed, list): + processed = [self._redact_message(msg, stats) for msg in parsed if isinstance(msg, dict)] + return json.dumps(processed, ensure_ascii=False) + except json.JSONDecodeError: + # Not valid JSON, apply text redaction directly + return self._redact_text(value, stats) + return value + + # Handle list directly (array of message objects) + if isinstance(value, list): + return [self._redact_message(msg, stats) for msg in value if isinstance(msg, dict)] + + return value + + def _process_span_attributes(self, span): + """Process a single span's attributes.""" + stats = RedactionStats() + attr_map: Dict[str, KeyValue] = {kv.key: kv for kv in span.attributes} + + # Log all attribute keys for debugging + if self.config.summary == "debug": + print(f"[GenAI Redaction] Processing span with {len(attr_map)} attributes") + print(f"[GenAI Redaction] Attribute keys: {list(attr_map.keys())}") + + # Process input messages (gen_ai.input.messages) + if self.config.redact_input_messages and GENAI_INPUT_MESSAGES in attr_map: + kv = attr_map[GENAI_INPUT_MESSAGES] + value = kv.value.value + if self.config.summary == "debug": + print(f"[GenAI Redaction] Found {GENAI_INPUT_MESSAGES}, type: {type(value).__name__}") + + new_value = self._process_messages_value(value, stats) + + # Update the value - for strings use AnyValue, for lists we need to serialize to JSON + if isinstance(new_value, str): + kv.value = AnyValue(new_value) + elif isinstance(new_value, list): + # Serialize list back to JSON string for storage + kv.value = AnyValue(json.dumps(new_value, ensure_ascii=False)) + + # Process output messages (gen_ai.output.messages) + if self.config.redact_output_messages and GENAI_OUTPUT_MESSAGES in attr_map: + kv = attr_map[GENAI_OUTPUT_MESSAGES] + value = kv.value.value + if self.config.summary == "debug": + print(f"[GenAI Redaction] Found {GENAI_OUTPUT_MESSAGES}, type: {type(value).__name__}") + + new_value = self._process_messages_value(value, stats) + + # Update the value + if isinstance(new_value, str): + kv.value = AnyValue(new_value) + elif isinstance(new_value, list): + kv.value = AnyValue(json.dumps(new_value, ensure_ascii=False)) + + # Log redaction stats + if self.config.summary == "debug": + print(f"[GenAI Redaction] Stats: patterns={stats.patterns_matched}, fields={stats.fields_redacted}, tool_args={stats.tool_args_redacted}, messages={stats.messages_processed}") + + # Add summary attributes + self._add_summary_attributes(attr_map, stats) + + # Update span attributes + span.attributes = list(attr_map.values()) + + +# ============================================================================ +# Convenience factory functions +# ============================================================================ + +def create_pii_redactor( + patterns: List[str] = None, + use_hashing: bool = False, + summary: str = "info" +) -> GenAIRedactionProcessor: + """ + Create a processor with common PII patterns. + + Args: + patterns: Additional custom patterns to redact + use_hashing: If True, use SHA256 hashing instead of [REDACTED] + summary: Summary level ("silent", "info", "debug") + + Returns: + Configured GenAIRedactionProcessor + """ + default_patterns = [ + # SSN (US) + r'\b\d{3}-\d{2}-\d{4}\b', + # Email addresses + r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', + # Phone numbers (various formats) + r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b', + # Credit card numbers (simple) + r'\b(?:\d{4}[-\s]?){3}\d{4}\b', + # IP addresses + r'\b(?:\d{1,3}\.){3}\d{1,3}\b', + ] + + all_patterns = default_patterns + (patterns or []) + + config = GenAIRedactionConfig( + content_patterns=all_patterns, + redact_tool_fields=["password", "api_key", "secret", "token", "key", "credential"], + redact_tool_arguments=["login", "authenticate", "set_password", "reset_password"], + hash_function="sha256" if use_hashing else None, + summary=summary, + ) + + return GenAIRedactionProcessor(config) + + +def create_minimal_redactor( + tool_fields: List[str] = None, + summary: str = "silent" +) -> GenAIRedactionProcessor: + """ + Create a minimal processor that only redacts specific tool fields. + + Useful when you only want to redact known sensitive fields without + pattern matching on all content. + """ + config = GenAIRedactionConfig( + content_patterns=[], + redact_tool_fields=tool_fields or ["password", "api_key", "secret"], + redact_tool_arguments=[], + summary=summary, + ) + + return GenAIRedactionProcessor(config) + diff --git a/rotel_python_processor_sdk/python_tests/genai_redaction_processor_test.py b/rotel_python_processor_sdk/python_tests/genai_redaction_processor_test.py new file mode 100644 index 00000000..4616640d --- /dev/null +++ b/rotel_python_processor_sdk/python_tests/genai_redaction_processor_test.py @@ -0,0 +1,453 @@ +""" +Tests for GenAI-specific redaction processor. + +These tests demonstrate how to configure and use the processor +for different redaction scenarios. +""" + +import json +import pytest +from processors.genai_redaction_processor import ( + GenAIRedactionConfig, + GenAIRedactionProcessor, + RedactionStats, + create_pii_redactor, + create_minimal_redactor, +) + + +class TestGenAIRedactionConfig: + """Tests for configuration validation.""" + + def test_default_config(self): + config = GenAIRedactionConfig() + assert config.replacement == "[REDACTED]" + assert config.hash_function is None + assert config.summary == "info" + assert config.redact_system_instructions is True + assert config.redact_input_messages is True + assert config.redact_output_messages is True + + def test_invalid_hash_function(self): + with pytest.raises(ValueError, match="not available"): + GenAIRedactionConfig(hash_function="not_a_hash") + + def test_invalid_summary(self): + with pytest.raises(ValueError, match="must be"): + GenAIRedactionConfig(summary="verbose") + + +class TestTextRedaction: + """Tests for content pattern redaction.""" + + def test_email_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[ + r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' + ] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + result = processor._redact_text( + "Contact me at john.doe@example.com for more info", + stats + ) + + assert "john.doe@example.com" not in result + assert "[REDACTED]" in result + assert stats.patterns_matched == 1 + + def test_ssn_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[r'\b\d{3}-\d{2}-\d{4}\b'] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + result = processor._redact_text( + "My SSN is 123-45-6789", + stats + ) + + assert "123-45-6789" not in result + assert "[REDACTED]" in result + + def test_multiple_patterns(self): + config = GenAIRedactionConfig( + content_patterns=[ + r'\b\d{3}-\d{2}-\d{4}\b', # SSN + r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email + ] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + result = processor._redact_text( + "SSN: 123-45-6789, Email: test@example.com", + stats + ) + + assert "123-45-6789" not in result + assert "test@example.com" not in result + assert stats.patterns_matched == 2 + + def test_hash_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[r'secret_\w+'], + hash_function="sha256" + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + result = processor._redact_text( + "The password is secret_abc123", + stats + ) + + assert "secret_abc123" not in result + assert "[REDACTED]" not in result # Using hash instead + assert len(result.split()[-1]) == 64 # SHA256 hex length + + +class TestToolFieldRedaction: + """Tests for tool-specific field redaction.""" + + def test_redact_password_field(self): + config = GenAIRedactionConfig( + redact_tool_fields=["password", "api_key"] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + obj = { + "username": "john", + "password": "supersecret123", + "email": "john@example.com" + } + + result = processor._redact_dict_fields(obj, stats) + + assert result["username"] == "john" + assert result["password"] == "[REDACTED]" + assert result["email"] == "john@example.com" + assert stats.fields_redacted == 1 + + def test_redact_nested_fields(self): + config = GenAIRedactionConfig( + redact_tool_fields=["secret"] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + obj = { + "config": { + "name": "test", + "credentials": { + "secret": "my_secret_value" + } + } + } + + result = processor._redact_dict_fields(obj, stats) + + assert result["config"]["name"] == "test" + assert result["config"]["credentials"]["secret"] == "[REDACTED]" + + +class TestMessagePartRedaction: + """Tests for message part redaction.""" + + def test_text_part_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[r'\b\d{3}-\d{2}-\d{4}\b'] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + part = { + "type": "text", + "content": "My SSN is 123-45-6789" + } + + result = processor._redact_message_part(part, stats) + + assert "123-45-6789" not in result["content"] + + def test_tool_call_argument_redaction(self): + config = GenAIRedactionConfig( + redact_tool_fields=["password"] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + part = { + "type": "tool_call", + "id": "call_123", + "name": "login", + "arguments": { + "username": "john", + "password": "secret123" + } + } + + result = processor._redact_message_part(part, stats) + + assert result["arguments"]["username"] == "john" + assert result["arguments"]["password"] == "[REDACTED]" + + def test_sensitive_tool_full_redaction(self): + config = GenAIRedactionConfig( + redact_tool_arguments=["authenticate"] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + part = { + "type": "tool_call", + "id": "call_123", + "name": "authenticate", + "arguments": { + "username": "john", + "password": "secret123", + "mfa_token": "123456" + } + } + + result = processor._redact_message_part(part, stats) + + # Entire arguments should be redacted + assert "[REDACTED]" in result["arguments"] + assert "john" not in str(result["arguments"]) + assert stats.tool_args_redacted == 1 + + def test_tool_response_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[r'token_\w+'] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + part = { + "type": "tool_call_response", + "id": "call_123", + "result": "Your token is token_abc123xyz" + } + + result = processor._redact_message_part(part, stats) + + assert "token_abc123xyz" not in result["result"] + + +class TestFullMessageRedaction: + """Tests for complete message redaction.""" + + def test_input_message_redaction(self): + config = GenAIRedactionConfig( + content_patterns=[r'\b\d{3}-\d{2}-\d{4}\b'], + redact_tool_fields=["password"] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + message = { + "role": "user", + "parts": [ + {"type": "text", "content": "My SSN is 123-45-6789"}, + { + "type": "tool_call", + "id": "call_1", + "name": "login", + "arguments": {"user": "john", "password": "secret"} + } + ] + } + + result = processor._redact_message(message, stats) + + assert result["role"] == "user" + assert "123-45-6789" not in result["parts"][0]["content"] + assert result["parts"][1]["arguments"]["password"] == "[REDACTED]" + assert stats.messages_processed == 1 + + +class TestJsonAttributeProcessing: + """Tests for JSON attribute processing.""" + + def test_process_json_messages(self): + config = GenAIRedactionConfig( + content_patterns=[r'secret_\w+'] + ) + processor = GenAIRedactionProcessor(config) + stats = RedactionStats() + + json_str = json.dumps([ + { + "role": "user", + "parts": [ + {"type": "text", "content": "The code is secret_abc123"} + ] + } + ]) + + result = processor._process_json_attribute( + json_str, + processor._redact_message, + stats + ) + + parsed = json.loads(result) + assert "secret_abc123" not in parsed[0]["parts"][0]["content"] + + +class TestConvenienceFactories: + """Tests for factory functions.""" + + def test_pii_redactor(self): + processor = create_pii_redactor() + stats = RedactionStats() + + # Test various PII patterns + text = """ + Email: john@example.com + Phone: (555) 123-4567 + SSN: 123-45-6789 + Credit Card: 4111-1111-1111-1111 + IP: 192.168.1.1 + """ + + result = processor._redact_text(text, stats) + + assert "john@example.com" not in result + assert "123-45-6789" not in result + assert "4111-1111-1111-1111" not in result + assert stats.patterns_matched >= 4 + + def test_pii_redactor_with_hashing(self): + processor = create_pii_redactor(use_hashing=True) + stats = RedactionStats() + + result = processor._redact_text("test@example.com", stats) + + assert "[REDACTED]" not in result + # Should contain a hash + assert len(result) > 20 + + def test_minimal_redactor(self): + processor = create_minimal_redactor( + tool_fields=["api_key", "secret"] + ) + stats = RedactionStats() + + obj = { + "api_key": "sk-12345", + "name": "test", + "secret": "my_secret" + } + + result = processor._redact_dict_fields(obj, stats) + + assert result["api_key"] == "[REDACTED]" + assert result["name"] == "test" + assert result["secret"] == "[REDACTED]" + + +class TestIntegrationScenarios: + """Integration tests showing real-world usage.""" + + def test_llm_conversation_redaction(self): + """Test redacting a full LLM conversation with PII.""" + processor = create_pii_redactor( + patterns=[r'patient_id_\w+'], # Custom pattern + summary="debug" + ) + + # Simulate input messages attribute value + input_messages = json.dumps([ + { + "role": "user", + "parts": [ + { + "type": "text", + "content": "Look up patient patient_id_12345, email is john@hospital.com" + } + ] + }, + { + "role": "assistant", + "parts": [ + { + "type": "tool_call", + "id": "call_1", + "name": "get_patient", + "arguments": {"patient_id": "patient_id_12345"} + } + ] + } + ]) + + stats = RedactionStats() + result = processor._process_json_attribute( + input_messages, + processor._redact_message, + stats + ) + + parsed = json.loads(result) + + # User message content should be redacted + user_content = parsed[0]["parts"][0]["content"] + assert "patient_id_12345" not in user_content + assert "john@hospital.com" not in user_content + + # Tool call arguments should be redacted + tool_args = parsed[1]["parts"][0]["arguments"] + assert "patient_id_12345" not in str(tool_args) + + assert stats.messages_processed == 2 + + def test_authentication_tool_redaction(self): + """Test that authentication-related tools are fully redacted.""" + config = GenAIRedactionConfig( + redact_tool_arguments=["login", "authenticate", "set_password"], + summary="info" + ) + processor = GenAIRedactionProcessor(config) + + input_messages = json.dumps([ + { + "role": "assistant", + "parts": [ + { + "type": "tool_call", + "id": "call_auth", + "name": "login", + "arguments": { + "username": "admin", + "password": "super_secret_password", + "mfa_code": "123456" + } + } + ] + } + ]) + + stats = RedactionStats() + result = processor._process_json_attribute( + input_messages, + processor._redact_message, + stats + ) + + parsed = json.loads(result) + tool_call = parsed[0]["parts"][0] + + # Entire arguments should be redacted + assert "admin" not in str(tool_call["arguments"]) + assert "super_secret_password" not in str(tool_call["arguments"]) + assert stats.tool_args_redacted == 1 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) + diff --git a/src/bin/clickhouse-ddl/README.md b/src/bin/clickhouse-ddl/README.md index 599955bf..1ab2b568 100644 --- a/src/bin/clickhouse-ddl/README.md +++ b/src/bin/clickhouse-ddl/README.md @@ -12,12 +12,20 @@ docker run -ti streamfold/rotel-clickhouse-ddl create --endpoint https://abcd123 # generate DDL for traces, use the JSON type docker run -ti streamfold/rotel-clickhouse-ddl create --endpoint https://abcd1234.us-east-1.aws.clickhouse.cloud:8443 --traces --enable-json + +# generate DDL for traces with GenAI materialized fields and multi-tenant partitioning +docker run -ti streamfold/rotel-clickhouse-ddl create --endpoint https://abcd1234.us-east-1.aws.clickhouse.cloud:8443 \ + --traces --materialize-genai-fields --partition-by-service-name ``` If you are executing against a localhost Clickhouse server, add `--network="host"`: ```shell docker run -ti --network="host" streamfold/rotel-clickhouse-ddl create --endpoint http://localhost:8123 --logs + +# For GenAI/LLM telemetry with optimized schema +docker run -ti --network="host" streamfold/rotel-clickhouse-ddl create --endpoint http://localhost:8123 \ + --traces --materialize-genai-fields --partition-by-service-name ``` Full usage: @@ -26,17 +34,34 @@ Full usage: Usage: clickhouse-ddl create [OPTIONS] --endpoint Options: - --endpoint Clickhouse endpoint - --database Database [default: otel] - --table-prefix Table prefix [default: otel] - --user User - --password Password - --engine DB engine [default: MergeTree] - --cluster Cluster name - --traces Create trace spans tables - --logs Create logs tables - --metrics Create metrics tables - --ttl TTL [default: 0s] - --enable-json Enable JSON column type - -h, --help Print help + --endpoint Clickhouse endpoint + --database Database [default: otel] + --table-prefix Table prefix [default: otel] + --user User + --password Password + --engine DB engine [default: MergeTree] + --cluster Cluster name + --traces Create trace spans tables + --logs Create logs tables + --metrics Create metrics tables + --ttl TTL [default: 0s] + --enable-json Enable JSON column type + --materialize-genai-fields Materialize GenAI fields from SpanAttributes (for traces) + --partition-by-service-name Partition by ServiceName for multi-tenant optimization (for traces) + -h, --help Print help ``` + +## GenAI/LLM Telemetry + +For GenAI/LLM observability workloads, use the `--materialize-genai-fields` flag to create MATERIALIZED columns +that extract GenAI semantic convention attributes from `SpanAttributes`. This enables efficient querying of: + +- `GenAIOperationName` - The operation type (e.g., "chat", "embeddings") +- `GenAIProviderName` - The LLM provider (e.g., "openai", "anthropic") +- `GenAIRequestModel` / `GenAIResponseModel` - Model names +- `GenAIInputTokens` / `GenAIOutputTokens` - Token usage +- `GenAILastInputMessage` / `GenAILastOutputMessage` - Last messages in conversation +- And more... + +The `--partition-by-service-name` flag optimizes multi-tenant deployments by partitioning data by service name, +enabling efficient per-tenant queries. diff --git a/src/bin/clickhouse-ddl/ddl_traces.rs b/src/bin/clickhouse-ddl/ddl_traces.rs index 215aab9f..b6468043 100644 --- a/src/bin/clickhouse-ddl/ddl_traces.rs +++ b/src/bin/clickhouse-ddl/ddl_traces.rs @@ -13,6 +13,8 @@ pub(crate) fn get_traces_ddl( engine: Engine, ttl: &Duration, use_json: bool, + materialize_genai_fields: bool, + partition_by_service_name: bool, ) -> Vec { let map_or_json = get_json_col_type(use_json); @@ -22,14 +24,37 @@ pub(crate) fn get_traces_ddl( "" }; - let indices = if engine != Engine::Null { - TRACES_TABLE_INDICES_SQL + // Build indices - add service name index if partitioning by service + let mut indices = String::new(); + if engine != Engine::Null { + indices.push_str(TRACES_TABLE_INDICES_SQL); + if partition_by_service_name { + indices.push_str(TRACES_SERVICE_INDEX_SQL); + } + } + + // Add GenAI materialized columns if requested + let genai_materialized = if materialize_genai_fields { + get_genai_materialized_columns(use_json) } else { "" }; let settings_str = get_settings(use_json, engine); + // Determine partition and order by based on options + let (partition_expr, order_expr) = if partition_by_service_name { + ( + "(ServiceName, toDate(Timestamp))", + "(ServiceName, Timestamp)", + ) + } else { + ( + "toDate(Timestamp)", + "(ServiceName, SpanName, toDateTime(Timestamp))", + ) + }; + let table_sql = replace_placeholders( TRACES_TABLE_SQL, &HashMap::from([ @@ -41,17 +66,12 @@ pub(crate) fn get_traces_ddl( ("MAP_OR_JSON", map_or_json), ("ENGINE", &engine.to_string()), ("MAP_INDICES", map_indices), - ("INDICES", indices), + ("INDICES", indices.as_str()), + ("GENAI_MATERIALIZED", genai_materialized), ("TTL_EXPR", &build_ttl_string(ttl, "toDateTime(Timestamp)")), ("SETTINGS", &settings_str), - ( - "PARTITION_BY", - &get_partition_by("toDate(Timestamp)", engine), - ), - ( - "ORDER_BY", - &get_order_by("(ServiceName, SpanName, toDateTime(Timestamp))", engine), - ), + ("PARTITION_BY", &get_partition_by(partition_expr, engine)), + ("ORDER_BY", &get_order_by(order_expr, engine)), ]), ); @@ -96,6 +116,15 @@ pub(crate) fn get_traces_ddl( } } +/// Returns the appropriate GenAI MATERIALIZED columns based on whether JSON or Map type is used +fn get_genai_materialized_columns(use_json: bool) -> &'static str { + if use_json { + GENAI_MATERIALIZED_JSON_SQL + } else { + GENAI_MATERIALIZED_MAP_SQL + } +} + const TRACES_TABLE_SQL: &str = r#" CREATE TABLE IF NOT EXISTS %%TABLE%% %%CLUSTER%% ( Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), @@ -125,6 +154,8 @@ CREATE TABLE IF NOT EXISTS %%TABLE%% %%CLUSTER%% ( Attributes %%MAP_OR_JSON%% ) CODEC(ZSTD(1)), + %%GENAI_MATERIALIZED%% + %%MAP_INDICES%% %%INDICES%% @@ -141,6 +172,10 @@ const TRACES_TABLE_INDICES_SQL: &str = r#" INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, "#; +const TRACES_SERVICE_INDEX_SQL: &str = r#" + INDEX idx_service_date (ServiceName, toDate(Timestamp)) TYPE minmax GRANULARITY 1, +"#; + const TRACES_TABLE_MAP_INDICES_SQL: &str = r#" INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, @@ -148,6 +183,50 @@ const TRACES_TABLE_MAP_INDICES_SQL: &str = r#" INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, "#; +// GenAI MATERIALIZED columns for Map type (default) +// Uses SpanAttributes['key'] syntax and casts for numeric values +const GENAI_MATERIALIZED_MAP_SQL: &str = r#" + -- GenAI MATERIALIZED columns (extracted from SpanAttributes Map) + GenAIConversationId String MATERIALIZED SpanAttributes['gen_ai.conversation.id'], + GenAIOperationName LowCardinality(String) MATERIALIZED SpanAttributes['gen_ai.operation.name'], + GenAIProviderName LowCardinality(String) MATERIALIZED SpanAttributes['gen_ai.provider.name'], + GenAIRequestModel LowCardinality(String) MATERIALIZED SpanAttributes['gen_ai.request.model'], + GenAIResponseModel LowCardinality(String) MATERIALIZED SpanAttributes['gen_ai.response.model'], + GenAIInputTokens UInt32 MATERIALIZED toUInt32OrZero(SpanAttributes['gen_ai.usage.input_tokens']), + GenAIOutputTokens UInt32 MATERIALIZED toUInt32OrZero(SpanAttributes['gen_ai.usage.output_tokens']), + GenAITemperature Float32 MATERIALIZED toFloat32OrZero(SpanAttributes['gen_ai.request.temperature']), + GenAIMaxTokens UInt32 MATERIALIZED toUInt32OrZero(SpanAttributes['gen_ai.request.max_tokens']), + GenAIResponseId String MATERIALIZED SpanAttributes['gen_ai.response.id'], + GenAIErrorType LowCardinality(String) MATERIALIZED SpanAttributes['error.type'], + -- Message content (stored as JSON strings in Map values) + GenAILastInputMessage String MATERIALIZED JSONExtractRaw(SpanAttributes['gen_ai.input.messages'], -1), + GenAILastOutputMessage String MATERIALIZED JSONExtractRaw(SpanAttributes['gen_ai.output.messages'], -1), + GenAISystemInstructions String MATERIALIZED SpanAttributes['gen_ai.system.instructions'], + GenAIToolDefinitions String MATERIALIZED SpanAttributes['gen_ai.tool.definitions'], +"#; + +// GenAI MATERIALIZED columns for JSON type (--enable-json) +// Uses JSON path syntax +const GENAI_MATERIALIZED_JSON_SQL: &str = r#" + -- GenAI MATERIALIZED columns (extracted from SpanAttributes JSON) + GenAIConversationId String MATERIALIZED SpanAttributes.'gen_ai.conversation.id', + GenAIOperationName LowCardinality(String) MATERIALIZED SpanAttributes.'gen_ai.operation.name', + GenAIProviderName LowCardinality(String) MATERIALIZED SpanAttributes.'gen_ai.provider.name', + GenAIRequestModel LowCardinality(String) MATERIALIZED SpanAttributes.'gen_ai.request.model', + GenAIResponseModel LowCardinality(String) MATERIALIZED SpanAttributes.'gen_ai.response.model', + GenAIInputTokens UInt32 MATERIALIZED SpanAttributes.'gen_ai.usage.input_tokens', + GenAIOutputTokens UInt32 MATERIALIZED SpanAttributes.'gen_ai.usage.output_tokens', + GenAITemperature Float32 MATERIALIZED SpanAttributes.'gen_ai.request.temperature', + GenAIMaxTokens UInt32 MATERIALIZED SpanAttributes.'gen_ai.request.max_tokens', + GenAIResponseId String MATERIALIZED SpanAttributes.'gen_ai.response.id', + GenAIErrorType LowCardinality(String) MATERIALIZED SpanAttributes.'error.type', + -- Message content + GenAILastInputMessage String MATERIALIZED toString(SpanAttributes.'gen_ai.input.messages'[-1]), + GenAILastOutputMessage String MATERIALIZED toString(SpanAttributes.'gen_ai.output.messages'[-1]), + GenAISystemInstructions String MATERIALIZED toString(SpanAttributes.'gen_ai.system.instructions'), + GenAIToolDefinitions String MATERIALIZED toString(SpanAttributes.'gen_ai.tool.definitions'), +"#; + const TRACES_TABLE_ID_TS_SQL: &str = r#" CREATE TABLE IF NOT EXISTS %%TABLE%% %%CLUSTER%% ( TraceId String CODEC(ZSTD(1)), diff --git a/src/bin/clickhouse-ddl/main.rs b/src/bin/clickhouse-ddl/main.rs index 5cc5c17f..9b33cea6 100644 --- a/src/bin/clickhouse-ddl/main.rs +++ b/src/bin/clickhouse-ddl/main.rs @@ -2,7 +2,6 @@ mod ddl; mod ddl_logs; mod ddl_metrics; mod ddl_traces; - use crate::ddl_logs::get_logs_ddl; use crate::ddl_metrics::get_metrics_ddl; use crate::ddl_traces::get_traces_ddl; @@ -71,6 +70,14 @@ pub struct CreateDDLArgs { /// Enable JSON column type #[arg(long, default_value = "false")] pub enable_json: bool, + + /// Materialize GenAI fields from SpanAttributes (for traces) + #[arg(long, default_value = "false")] + pub materialize_genai_fields: bool, + + /// Partition by ServiceName for multi-tenant optimization (for traces) + #[arg(long, default_value = "false")] + pub partition_by_service_name: bool, } #[derive(Clone, Debug, Copy, PartialEq, ValueEnum)] @@ -162,6 +169,8 @@ async fn main() -> ExitCode { ddl.engine, &ttl, ddl.enable_json, + ddl.materialize_genai_fields, + ddl.partition_by_service_name, ); for q in sql { diff --git a/src/exporters/clickhouse/mod.rs b/src/exporters/clickhouse/mod.rs index 95265b8b..1ec23e6f 100644 --- a/src/exporters/clickhouse/mod.rs +++ b/src/exporters/clickhouse/mod.rs @@ -1,12 +1,12 @@ mod api_request; mod ch_error; -mod compression; +pub(crate) mod compression; mod exception; -mod payload; +pub mod payload; mod request_builder; mod request_mapper; -mod rowbinary; -mod schema; +pub(crate) mod rowbinary; +pub(crate) mod schema; mod transform_logs; mod transform_metrics; mod transform_traces; diff --git a/src/init/args.rs b/src/init/args.rs index 76c2d0f6..af6cc382 100644 --- a/src/init/args.rs +++ b/src/init/args.rs @@ -127,6 +127,7 @@ pub struct AgentRun { #[command(flatten)] pub clickhouse_exporter: ClickhouseExporterArgs, + #[command(flatten)] pub aws_xray_exporter: XRayExporterArgs, @@ -240,6 +241,7 @@ pub enum Exporter { Datadog, Clickhouse, + #[clap(name = "awsxray")] AwsXray,