diff --git a/README.md b/README.md index feaab16..41c9c98 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,15 @@ A chatbot that provides semantic search and conversational AI capabilities for C ## Key Features ### Core RAG Capabilities -- **Semantic RAG Pipeline**: Advanced semantic understanding with intent classification and query expansion +- **Semantic RAG Pipeline**: Advanced semantic understanding with query reformulation - **Dual Pipeline Architecture**: Both standard and semantic-enhanced RAG processing - **Intelligent Query Processing**: Handles misleading keywords and provides contextually accurate responses -- **Multi-Query Retrieval**: Semantic query expansion with domain-specific mappings +- **Vector Retrieval**: Direct vector search with optional semantic re-ranking - **Semantic Re-ranking**: Re-ranks results based on semantic similarity and intent matching ### Advanced Semantic Features - **Intent Classification**: Automatically detects query intent (team_info, project_info, technical_info, feature_info) -- **Query Expansion**: Generates semantic variations using domain-specific terminology +- **Query Reformulation**: LLM-based coreference resolution and context clarification - **Multilingual Support**: Optimized for French and English content with synonym handling - **Contextual Understanding**: Maintains document hierarchy awareness for numerical and structured queries @@ -75,7 +75,6 @@ A chatbot that provides semantic search and conversational AI capabilities for C # Optional - Semantic Features USE_SEMANTIC_FEATURES=true - SEMANTIC_EXPANSION_ENABLED=true SEMANTIC_RERANKING_ENABLED=true # Optional - Azure Integration (for production) @@ -213,7 +212,7 @@ Isschat/ │ ├── rag/ # RAG pipeline implementation │ │ ├── pipeline.py # Standard RAG pipeline │ │ ├── semantic_pipeline.py # Semantic-enhanced RAG pipeline -│ │ ├── query_processor.py # Advanced query processing +│ │ ├── reformulation_service.py # LLM-based query reformulation │ │ └── tools/ # RAG tools (retrieval, generation) │ ├── storage/ # Storage abstraction │ │ ├── storage_factory.py # Storage factory (local/Azure) @@ -255,7 +254,7 @@ Isschat/ ### Semantic Intelligence - **Intent Classification**: Automatically detects and routes queries based on intent (team_info, project_info, technical_info, feature_info) -- **Query Expansion**: Semantic expansion using domain-specific mappings and synonym handling +- **Query Reformulation**: LLM-based coreference resolution using conversation context - **Context-Aware Retrieval**: Maintains document hierarchy awareness for complex queries - **Multilingual Processing**: Optimized for French and English content with cross-language understanding diff --git a/flux_donnees.mmd b/data_flow.mmd similarity index 92% rename from flux_donnees.mmd rename to data_flow.mmd index bed8096..be101b3 100644 --- a/flux_donnees.mmd +++ b/data_flow.mmd @@ -13,8 +13,10 @@ graph TD ChatInterface -->|Query| SemanticPipeline[Semantic RAG Pipeline] %% Semantic Processing Pipeline - SemanticPipeline -->|Analyze query| QueryProcessor[Query Processor] - QueryProcessor -->|Intent & expansion| SemanticRetrieval[Semantic Retrieval Tool] + SemanticPipeline -->|Reformulate query| ReformulationService[Reformulation Service] + ReformulationService -->|LLM call| LLM + LLM -->|Reformulated query| ReformulationService + ReformulationService -->|Resolved query| SemanticRetrieval[Vector Retrieval Tool] SemanticRetrieval -->|Vector search| WeaviateDB[(Weaviate Vector DB
Collection: isschat_docs)] WeaviateDB -->|Relevant documents| SemanticRetrieval SemanticRetrieval -->|Ranked results| GenerationTool[Generation Tool] @@ -74,7 +76,7 @@ graph TD subgraph "RAG Processing Engine" SemanticPipeline - QueryProcessor + ReformulationService SemanticRetrieval GenerationTool LLM @@ -121,7 +123,7 @@ graph TD class User,WebApp,CLI,ChatInterface,AdminInterface,InteractiveCLI interface class AzureAuth,KeyVault auth - class SemanticPipeline,QueryProcessor,SemanticRetrieval,GenerationTool,LLM processing + class SemanticPipeline,ReformulationService,SemanticRetrieval,GenerationTool,LLM processing class WeaviateDB,DataManager,StorageSystem,LocalStorage,AzureStorage storage class FeaturesManager,HistoryManager,PerformanceDashboard components class IngestionPipeline,ConfluenceConnector,ConfluenceAPI,DocumentProcessor,DocumentChunker,EmbeddingService ingestion diff --git a/rag_evaluation/core/llm_judge.py b/rag_evaluation/core/llm_judge.py index 349bccc..7975113 100644 --- a/rag_evaluation/core/llm_judge.py +++ b/rag_evaluation/core/llm_judge.py @@ -36,7 +36,7 @@ def __init__(self, config: Any): temperature=config.judge_temperature, max_tokens=config.judge_max_tokens, openai_api_key=api_key, - openai_api_base="https://openrouter.ai/api/v1", + openai_api_base=config.openrouter_base_url, ) def evaluate_conversational(self, question: str, response: str, expected: str, context: str = "") -> Dict[str, Any]: diff --git a/src/config/settings.py b/src/config/settings.py index d24548f..d97bf00 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -32,16 +32,24 @@ class IsschatConfig: # Semantic understanding configuration use_semantic_features: bool = True - semantic_expansion_enabled: bool = True semantic_reranking_enabled: bool = True semantic_similarity_threshold: float = 0.7 - query_expansion_max_variations: int = 5 intent_classification_enabled: bool = True + # OpenRouter API configuration + openrouter_base_url: str = "https://openrouter.ai/api/v1" + openrouter_timeout: int = 30 + + # Query reformulation configuration + force_reformulate_all_queries: bool = True + reformulation_timeout: int = 15 # Shorter for reformulation + reformulation_max_tokens: int = 150 + # Source filtering configuration source_filtering_enabled: bool = True - min_source_score_threshold: float = 0.4 - min_source_relevance_threshold: float = 0.3 + min_source_score_threshold: float = 0.3 # Réduit de 0.4 → 0.3 + min_source_relevance_threshold: float = 0.2 # Réduit de 0.3 → 0.2 + use_flexible_filtering: bool = True # Enable flexible multi-criteria filtering confluence_api_key: str = "" confluence_space_key: str = "" @@ -83,10 +91,6 @@ def from_env(cls, env_file: str = ".env") -> "IsschatConfig": search_fetch_k=int(os.getenv("SEARCH_FETCH_K", str(defaults.search_fetch_k))), use_semantic_features=os.getenv("USE_SEMANTIC_FEATURES", str(defaults.use_semantic_features)).lower() == "true", - semantic_expansion_enabled=os.getenv( - "SEMANTIC_EXPANSION_ENABLED", str(defaults.semantic_expansion_enabled) - ).lower() - == "true", semantic_reranking_enabled=os.getenv( "SEMANTIC_RERANKING_ENABLED", str(defaults.semantic_reranking_enabled) ).lower() @@ -94,13 +98,16 @@ def from_env(cls, env_file: str = ".env") -> "IsschatConfig": semantic_similarity_threshold=float( os.getenv("SEMANTIC_SIMILARITY_THRESHOLD", str(defaults.semantic_similarity_threshold)) ), - query_expansion_max_variations=int( - os.getenv("QUERY_EXPANSION_MAX_VARIATIONS", str(defaults.query_expansion_max_variations)) - ), intent_classification_enabled=os.getenv( "INTENT_CLASSIFICATION_ENABLED", str(defaults.intent_classification_enabled) ).lower() == "true", + force_reformulate_all_queries=os.getenv( + "FORCE_REFORMULATE_ALL_QUERIES", str(defaults.force_reformulate_all_queries) + ).lower() + == "true", + reformulation_timeout=int(os.getenv("REFORMULATION_TIMEOUT", str(defaults.reformulation_timeout))), + reformulation_max_tokens=int(os.getenv("REFORMULATION_MAX_TOKENS", str(defaults.reformulation_max_tokens))), source_filtering_enabled=os.getenv( "SOURCE_FILTERING_ENABLED", str(defaults.source_filtering_enabled) ).lower() @@ -111,6 +118,8 @@ def from_env(cls, env_file: str = ".env") -> "IsschatConfig": min_source_relevance_threshold=float( os.getenv("MIN_SOURCE_RELEVANCE_THRESHOLD", str(defaults.min_source_relevance_threshold)) ), + use_flexible_filtering=os.getenv("USE_FLEXIBLE_FILTERING", str(defaults.use_flexible_filtering)).lower() + == "true", confluence_api_key=secrets.get_confluence_api_key() or defaults.confluence_api_key, confluence_space_key=secrets.get_confluence_space_key() or defaults.confluence_space_key, confluence_space_name=secrets.get_confluence_space_name() or defaults.confluence_space_name, @@ -173,10 +182,8 @@ def get_debug_info() -> dict: "data_dir": str(config.data_dir), "semantic_features": { "use_semantic_features": config.use_semantic_features, - "semantic_expansion_enabled": config.semantic_expansion_enabled, "semantic_reranking_enabled": config.semantic_reranking_enabled, "semantic_similarity_threshold": config.semantic_similarity_threshold, - "query_expansion_max_variations": config.query_expansion_max_variations, "intent_classification_enabled": config.intent_classification_enabled, }, } diff --git a/src/rag/__init__.py b/src/rag/__init__.py index 004874b..a63d065 100644 --- a/src/rag/__init__.py +++ b/src/rag/__init__.py @@ -4,5 +4,11 @@ """ from .pipeline import RAGPipeline, RAGPipelineFactory +from .semantic_pipeline import SemanticRAGPipeline, SemanticRAGPipelineFactory -__all__ = ["RAGPipeline", "RAGPipelineFactory"] +__all__ = [ + "RAGPipeline", + "RAGPipelineFactory", # Legacy pipeline + "SemanticRAGPipeline", + "SemanticRAGPipelineFactory", # Modern pipeline +] diff --git a/src/rag/query_processor.py b/src/rag/query_processor.py deleted file mode 100644 index 06b3bc6..0000000 --- a/src/rag/query_processor.py +++ /dev/null @@ -1,416 +0,0 @@ -""" -Query processor for semantic understanding and intent detection. -Handles query expansion, synonym matching, and intent classification. -""" - -import logging -from typing import List -import re -from dataclasses import dataclass - -from ..config import get_config -from ..embeddings import get_embedding_service - - -@dataclass -class QueryProcessingResult: - """Result of query processing with expanded queries and intent""" - - original_query: str - expanded_queries: List[str] - intent: str - keywords: List[str] - semantic_variations: List[str] - confidence: float - - -class QueryProcessor: - """ - Advanced query processor with semantic understanding capabilities. - Handles misleading keywords by expanding queries with semantic variations. - """ - - def __init__(self): - self.config = get_config() - self.logger = logging.getLogger(self.__class__.__name__) - self._embedding_service = None - - # Domain-specific synonym mapping for Isschat context - self.semantic_mappings = { - # Team/collaboration terms - "collaborateurs": ["équipe", "team", "membres", "développeurs", "participants"], - "équipe": ["collaborateurs", "team", "membres", "développeurs", "participants"], - "team": ["équipe", "collaborateurs", "membres", "développeurs", "participants"], - "membres": ["équipe", "collaborateurs", "team", "développeurs", "participants"], - "développeurs": ["équipe", "collaborateurs", "team", "membres", "participants"], - # Project/product terms - "projet": ["application", "produit", "système", "plateforme", "solution"], - "application": ["projet", "produit", "système", "plateforme", "solution"], - "produit": ["projet", "application", "système", "plateforme", "solution"], - "système": ["projet", "application", "produit", "plateforme", "solution"], - "plateforme": ["projet", "application", "produit", "système", "solution"], - # Technical terms - "configuration": ["config", "paramètres", "réglages", "settings"], - "config": ["configuration", "paramètres", "réglages", "settings"], - "paramètres": ["configuration", "config", "réglages", "settings"], - "réglages": ["configuration", "config", "paramètres", "settings"], - # Documentation terms - "documentation": ["docs", "guide", "manuel", "aide"], - "docs": ["documentation", "guide", "manuel", "aide"], - "guide": ["documentation", "docs", "manuel", "aide"], - "manuel": ["documentation", "docs", "guide", "aide"], - # Common French/English variations - "fonctionnalités": ["features", "capacités", "options"], - "features": ["fonctionnalités", "capacités", "options"], - "utilisation": ["usage", "use", "utiliser"], - "usage": ["utilisation", "use", "utiliser"], - } - - # Intent patterns for better classification - self.intent_patterns = { - "team_info": [ - r"qui\s+sont\s+les\s+(collaborateurs|équipe|membres|développeurs)", - r"(team|équipe|collaborateurs|membres)\s+(sur|de|du|dans)", - r"(composition|responsabilités)\s+(équipe|team)", - r"(vincent|nicolas|emin|fraillon|lambropoulos|calyaka)", - r"(équipe|team|collaborateurs|membres|développeurs)\s+(développement|project|isschat)", - r"(développement|project)\s+(équipe|team)", - r"(développeurs|developers)\s+(du|of|on)\s+(système|project|isschat)", - ], - "project_info": [ - r"(qu est-ce que|what is|c est quoi)\s+(isschat|le projet)", - r"(description|présentation|overview)\s+(du projet|d isschat)", - r"(objectif|but|goal)\s+(du projet|d isschat)", - ], - "technical_info": [ - r"(comment|how)\s+(utiliser|use|configurer|configure)", - r"(installation|setup|configuration)", - r"(problème|error|erreur|bug)", - ], - "feature_info": [ - r"(fonctionnalités|features|capacités)", - r"(peut|can)\s+(faire|do)", - r"(options|paramètres|settings)", - ], - } - - @property - def embedding_service(self): - """Lazy loading of embedding service""" - if self._embedding_service is None: - self._embedding_service = get_embedding_service() - return self._embedding_service - - def process_query(self, query: str) -> QueryProcessingResult: - """ - Process a query with semantic understanding and expansion. - - Args: - query: Original user query - - Returns: - QueryProcessingResult with expanded queries and intent - """ - try: - # Clean and normalize query - normalized_query = self._normalize_query(query) - - # Extract keywords - keywords = self._extract_keywords(normalized_query) - - # Detect intent - intent = self._classify_intent(normalized_query) - - # Generate semantic variations - semantic_variations = self._generate_semantic_variations(normalized_query, keywords) - - # Create expanded queries - expanded_queries = self._create_expanded_queries(normalized_query, semantic_variations) - - # Calculate confidence based on intent detection and semantic coverage - confidence = self._calculate_confidence(intent, keywords, semantic_variations) - - result = QueryProcessingResult( - original_query=query, - expanded_queries=expanded_queries, - intent=intent, - keywords=keywords, - semantic_variations=semantic_variations, - confidence=confidence, - ) - - self.logger.debug(f"Query processed: '{query}' -> {len(expanded_queries)} variations, intent: {intent}") - return result - - except Exception as e: - self.logger.error(f"Query processing failed: {e}") - # Return fallback result - return QueryProcessingResult( - original_query=query, - expanded_queries=[query], - intent="general", - keywords=query.split(), - semantic_variations=[], - confidence=0.5, - ) - - def _normalize_query(self, query: str) -> str: - """Normalize query text""" - # Convert to lowercase - query = query.lower().strip() - - # Remove extra whitespace - query = re.sub(r"\s+", " ", query) - - # Remove punctuation but keep accents - query = re.sub(r"[^\w\s\-àâäéèêëïîôöùûüÿñç]", " ", query) - - return query.strip() - - def _extract_keywords(self, query: str) -> List[str]: - """Extract meaningful keywords from query""" - # Stop words in French and English - stop_words = { - "le", - "la", - "les", - "un", - "une", - "des", - "du", - "de", - "da", - "et", - "ou", - "est", - "sont", - "avec", - "sur", - "dans", - "pour", - "par", - "qui", - "que", - "quoi", - "comment", - "où", - "quand", - "pourquoi", - "the", - "a", - "an", - "and", - "or", - "is", - "are", - "with", - "on", - "in", - "for", - "by", - "who", - "what", - "where", - "when", - "why", - "how", - "this", - "that", - "these", - "those", - "can", - "could", - "should", - "nous", - "vous", - "ils", - "elles", - "je", - "tu", - "il", - "elle", - "me", - "te", - "se", - "nous", - "vous", - "moi", - "toi", - "lui", - "elle", - "eux", - "elles", - "mon", - "ton", - "son", - "ma", - "ta", - "sa", - "mes", - "tes", - "ses", - "notre", - "votre", - "leur", - "nos", - "vos", - "leurs", - } - - words = query.split() - keywords = [word for word in words if word not in stop_words and len(word) > 2] - - return keywords - - def _classify_intent(self, query: str) -> str: - """Classify query intent based on patterns with priority""" - # Score each intent based on pattern matches - intent_scores = {} - - for intent, patterns in self.intent_patterns.items(): - score = 0 - for pattern in patterns: - if re.search(pattern, query, re.IGNORECASE): - score += 1 - intent_scores[intent] = score - - # Special handling for conflicting keywords - # If query contains both team and technical terms, analyze context - if intent_scores.get("team_info", 0) > 0 and intent_scores.get("technical_info", 0) > 0: - # Check if team terms are more prominent - team_terms = ["équipe", "team", "collaborateurs", "membres", "développeurs"] - tech_terms = ["configuration", "installation", "problème", "erreur"] - - team_count = sum(1 for term in team_terms if term in query) - tech_count = sum(1 for term in tech_terms if term in query) - - # If team terms are more prominent or equal, prefer team_info - if team_count >= tech_count: - return "team_info" - - # Special case: if only technical_info matched but we have team terms, check context - elif intent_scores.get("technical_info", 0) > 0 and intent_scores.get("team_info", 0) == 0: - team_terms = ["équipe", "team", "collaborateurs", "membres", "développeurs"] - if any(term in query for term in team_terms): - # Check if it's about team configuration rather than system configuration - if "configuration" in query and any(term in query for term in team_terms): - return "team_info" - - # Return intent with highest score - if intent_scores: - best_intent = max(intent_scores, key=intent_scores.get) - if intent_scores[best_intent] > 0: - return best_intent - - return "general" - - def _generate_semantic_variations(self, query: str, keywords: List[str]) -> List[str]: - """Generate semantic variations of the query""" - variations = [] - - # Generate variations based on synonym mappings - for keyword in keywords: - if keyword in self.semantic_mappings: - synonyms = self.semantic_mappings[keyword] - for synonym in synonyms: - # Replace keyword with synonym in query - variation = query.replace(keyword, synonym) - if variation != query and variation not in variations: - variations.append(variation) - - # Add context-specific variations based on intent - intent_variations = self._get_intent_variations(query) - variations.extend(intent_variations) - - return variations - - def _get_intent_variations(self, query: str) -> List[str]: - """Generate intent-specific query variations""" - variations = [] - - # Team info variations - if any(word in query for word in ["collaborateurs", "équipe", "team", "membres"]): - variations.extend( - [ - "équipe composition responsabilités", - "membres développeurs isschat", - "vincent nicolas emin fraillon lambropoulos calyaka", - "team composition isschat project", - "collaborateurs projet isschat", - ] - ) - - # Project info variations - if any(word in query for word in ["projet", "isschat", "application"]): - variations.extend( - [ - "isschat description présentation", - "projet objectif but", - "application fonctionnalités", - "système isschat overview", - ] - ) - - return variations - - def _create_expanded_queries(self, original_query: str, semantic_variations: List[str]) -> List[str]: - """Create final list of expanded queries""" - expanded_queries = [original_query] - - # Add semantic variations - for variation in semantic_variations: - if variation not in expanded_queries: - expanded_queries.append(variation) - - # Limit to reasonable number of queries - return expanded_queries[:5] - - def _calculate_confidence(self, intent: str, keywords: List[str], variations: List[str]) -> float: - """Calculate confidence score for query processing""" - confidence = 0.5 # Base confidence - - # Boost confidence for recognized intent - if intent != "general": - confidence += 0.2 - - # Boost confidence for meaningful keywords - if len(keywords) > 0: - confidence += min(0.2, len(keywords) * 0.05) - - # Boost confidence for semantic variations found - if len(variations) > 0: - confidence += min(0.1, len(variations) * 0.02) - - return min(1.0, confidence) - - def get_semantic_similarity(self, text1: str, text2: str) -> float: - """Calculate semantic similarity between two texts""" - try: - embedding1 = self.embedding_service.encode_single(text1) - embedding2 = self.embedding_service.encode_single(text2) - return self.embedding_service.similarity(embedding1, embedding2) - except Exception as e: - self.logger.error(f"Semantic similarity calculation failed: {e}") - return 0.0 - - def expand_query_with_embeddings(self, query: str, candidate_texts: List[str], threshold: float = 0.7) -> List[str]: - """ - Expand query based on semantic similarity with candidate texts. - Useful for finding semantically similar content even with different keywords. - """ - try: - query_embedding = self.embedding_service.encode_single(query) - expanded_terms = [] - - for text in candidate_texts: - text_embedding = self.embedding_service.encode_single(text) - similarity = self.embedding_service.similarity(query_embedding, text_embedding) - - if similarity >= threshold: - expanded_terms.append(text) - - return expanded_terms - - except Exception as e: - self.logger.error(f"Embedding-based query expansion failed: {e}") - return [] diff --git a/src/rag/reformulation_service.py b/src/rag/reformulation_service.py new file mode 100644 index 0000000..e3ccaca --- /dev/null +++ b/src/rag/reformulation_service.py @@ -0,0 +1,371 @@ +""" +Query reformulation service for resolving coreferences and clarifying implicit references. +Uses LLM to reformulate user queries with context from recent conversation exchanges. +""" + +import logging +import requests +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +from ..config import get_config + + +@dataclass +class ConversationExchange: + """Represents a single exchange in the conversation""" + + user_message: str + assistant_message: str + + +class ReformulationService: + """ + Service for reformulating user queries to resolve coreferences and clarify implicit references. + Takes the user query and recent conversation context to produce a clear, autonomous query. + """ + + def __init__(self): + self.config = get_config() + self.logger = logging.getLogger(self.__class__.__name__) + + if not self.config.openrouter_api_key: + raise ValueError("OPENROUTER_API_KEY required for reformulation service") + + def reformulate_query( + self, user_query: str, recent_exchanges: List[ConversationExchange], max_exchanges: int = 3 + ) -> str: + """ + Reformulate a user query using recent conversation context. + + Args: + user_query: The original user query + recent_exchanges: List of recent conversation exchanges + max_exchanges: Maximum number of recent exchanges to consider + + Returns: + Reformulated query that is autonomous and clear + """ + try: + self.logger.info(f"🔄 REFORMULATION START: '{user_query}'") + self.logger.info(f"📝 Recent exchanges count: {len(recent_exchanges)}") + + # Log the exchanges for debugging + for i, exchange in enumerate(recent_exchanges[-3:], 1): + assistant_preview = exchange.assistant_message[:50] + self.logger.info( + f" Exchange {i}: User='{exchange.user_message}' -> Assistant='{assistant_preview}...'" + ) + + # Check autonomy + is_autonomous = self._is_query_autonomous(user_query) + self.logger.info(f"🔍 Query autonomy check: {is_autonomous}") + + # If no context or query is already clear, return original + if not recent_exchanges: + self.logger.info("❌ No recent exchanges - skipping reformulation") + return user_query + + if is_autonomous: + self.logger.info("✅ Query is autonomous - skipping reformulation") + return user_query + + # Limit to recent exchanges + context_exchanges = recent_exchanges[-max_exchanges:] if recent_exchanges else [] + self.logger.info(f"📚 Using {len(context_exchanges)} exchanges for context") + + # Build reformulation prompt + prompt = self._build_reformulation_prompt(user_query, context_exchanges) + self.logger.info(f"📝 Reformulation prompt built (length: {len(prompt)} chars)") + + # Call LLM for reformulation + self.logger.info("🤖 Calling LLM for reformulation...") + reformulated = self._call_llm_for_reformulation(prompt) + + # Validate and return reformulated query + if reformulated and reformulated.strip(): + self.logger.info(f"✅ REFORMULATION SUCCESS: '{user_query}' -> '{reformulated}'") + return reformulated.strip() + else: + self.logger.warning("⚠️ Reformulation returned empty result, using original query") + return user_query + + except Exception as e: + self.logger.error(f"❌ Reformulation failed: {e}") + import traceback + + self.logger.error(f"Full traceback: {traceback.format_exc()}") + # Fallback to original query on any error + return user_query + + def _is_query_autonomous(self, query: str) -> bool: + """ + Check if a query is already autonomous and doesn't need reformulation. + + Args: + query: The user query to check + + Returns: + True if query is autonomous, False if it needs reformulation + """ + query_lower = query.lower().strip() + import re + + query_normalized = re.sub(r"[^\w\s]", " ", query_lower).strip() + words = query_normalized.split() + + self.logger.debug(f"🔍 Autonomy check for: '{query}'") + self.logger.debug(f" Normalized: '{query_normalized}'") + self.logger.debug(f" Words: {words}") + + # Check various autonomy indicators + if self._has_coreference_indicators(words): + return False + + if self._has_implicit_reference_patterns(query_lower): + return False + + if self._has_ambiguous_references(words): + return False + + if self._has_french_question_patterns(query_lower): + return False + + if self._has_demonstrative_references(query_lower): + return False + + if self._should_force_reformulation(): + return False + + self.logger.debug(" ✅ Query appears autonomous") + return True + + def _has_coreference_indicators(self, words: list) -> bool: + """Check for coreference indicators at the beginning of the query""" + coreference_indicators = [ + "il", + "elle", + "ils", + "elles", # French pronouns + "he", + "she", + "they", + "it", # English pronouns + "ça", + "cela", + "that", + "this", # Demonstratives + "le", + "la", + "les", # French definite articles (when ambiguous) + ] + + if words and words[0] in coreference_indicators: + self.logger.debug(f" ❌ Found coreference indicator at start: '{words[0]}'") + return True + return False + + def _has_implicit_reference_patterns(self, query_lower: str) -> bool: + """Check for implicit reference patterns""" + import re + + implicit_indicators = [ + "comment faire", + "how to do", + "how to", + "comment utiliser", + "how to use", + "comment installer", + "how to install", + "où est", + "where is", + "where are", + "qu'est-ce que c'est", + "what is it", + "comment ça marche", + "how does it work", + "pourquoi", + "why", + ] + + for indicator in implicit_indicators: + if indicator in query_lower: + remaining_text = query_lower.split(indicator, 1)[-1] if len(query_lower.split(indicator, 1)) > 1 else "" + remaining_normalized = re.sub(r"[^\w\s]", " ", remaining_text).strip() + remaining_words = remaining_normalized.split() + if any(word in remaining_words for word in ["it", "that", "ça", "cela"]): + self.logger.debug(f" ❌ Found implicit reference pattern: '{indicator}' with ambiguous reference") + return True + return False + + def _has_ambiguous_references(self, words: list) -> bool: + """Check for standalone ambiguous references""" + ambiguous_words = ["it", "that", "this", "they", "them", "ça", "cela"] + for word in ambiguous_words: + if word in words: + self.logger.debug(f" ❌ Found ambiguous word: '{word}'") + return True + return False + + def _has_french_question_patterns(self, query_lower: str) -> bool: + """Check for French questioning patterns that need context""" + french_question_patterns = [ + "et qui", + "et quoi", + "et où", + "et quand", + "et comment", + "qui l'utilise", + "qui utilise", + "qui fait", + "qui peut", + "qui en sont", + "qui en est", + "qu'en est", + "en sont", + "en est", + ] + + for pattern in french_question_patterns: + if pattern in query_lower: + self.logger.debug(f" ❌ Found French question pattern needing context: '{pattern}'") + return True + return False + + def _has_demonstrative_references(self, query_lower: str) -> bool: + """Check for demonstrative pronouns that need context resolution""" + demonstrative_references = [ + "ce projet", + "cette application", + "ce système", + "cette solution", + "cet outil", + "ces outils", + "ce document", + "cette documentation", + "ce processus", + "cette méthode", + "ce code", + "cette fonction", + ] + + for ref in demonstrative_references: + if ref in query_lower: + self.logger.debug(f" ❌ Found demonstrative reference needing context: '{ref}'") + return True + return False + + def _should_force_reformulation(self) -> bool: + """Check if configuration forces reformulation for all queries""" + if self.config.force_reformulate_all_queries: + self.logger.debug(" 🔄 Configuration forces reformulation for all queries") + return True + return False + + def _build_reformulation_prompt(self, user_query: str, context_exchanges: List[ConversationExchange]) -> str: + """ + Build the prompt for LLM reformulation. + + Args: + user_query: The original user query + context_exchanges: Recent conversation exchanges for context + + Returns: + Formatted prompt for the LLM + """ + # Build conversation context + context_text = "" + if context_exchanges: + context_text = "Recent conversation context:\n" + for i, exchange in enumerate(context_exchanges, 1): + context_text += f"Exchange {i}:\n" + context_text += f"User: {exchange.user_message}\n" + context_text += f"Assistant: {exchange.assistant_message}\n\n" + + prompt = f"""You are a query reformulation assistant. Your task is to reformulate user queries to make them +autonomous and clear by resolving coreferences and clarifying implicit references. + +Here is the recent conversation context: + +{context_text} + +Current user query: "{user_query}" + +Instructions: +1. Analyze the current query for pronouns (he, she, they, it, il, elle, ils, elles, ça, cela) and implicit references +2. Use the conversation context to resolve what these pronouns and references refer to +3. Reformulate the query to be completely autonomous - someone reading only the reformulated query +should understand exactly what is being asked +4. Keep the reformulated query concise but specific +5. Maintain the original intent and question type +6. If the query is already autonomous, return it unchanged + +Examples: +- "How do I install it?" → "How do I install Docker?" (if Docker was mentioned in context) +- "What can they do?" → "What can Vincent and Nicolas do?" (if Vincent and Nicolas were mentioned in context) +- "Tell me more about that" → "Tell me more about the authentication system" (if auth system was mentioned in context) + +Reformulated query:""" + + return prompt + + def _call_llm_for_reformulation(self, prompt: str) -> Optional[str]: + """ + Call the LLM to reformulate the query. + + Args: + prompt: The reformulation prompt + + Returns: + Reformulated query or None if failed + """ + headers = {"Authorization": f"Bearer {self.config.openrouter_api_key}", "Content-Type": "application/json"} + + payload = { + "model": self.config.llm_model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1, # Low temperature for consistent reformulation + "max_tokens": self.config.reformulation_max_tokens, + } + + try: + response = requests.post( + f"{self.config.openrouter_base_url}/chat/completions", + headers=headers, + json=payload, + timeout=self.config.reformulation_timeout, + ) + response.raise_for_status() + + data = response.json() + reformulated = data["choices"][0]["message"]["content"].strip() + + # Clean up the response (remove quotes, extra formatting) + reformulated = reformulated.strip("\"'") + + return reformulated + + except requests.RequestException as e: + self.logger.error(f"LLM API error during reformulation: {e}") + return None + except (KeyError, IndexError) as e: + self.logger.error(f"Invalid LLM response format during reformulation: {e}") + return None + + def is_ready(self) -> bool: + """Check if the reformulation service is ready to use""" + return bool(self.config.openrouter_api_key) + + def get_stats(self) -> Dict[str, Any]: + """Get service statistics""" + return { + "type": "reformulation_service", + "ready": self.is_ready(), + "config": { + "llm_model": self.config.llm_model, + "api_key_configured": bool(self.config.openrouter_api_key), + "base_url": self.config.openrouter_base_url, + "timeout": self.config.reformulation_timeout, + "max_tokens": self.config.reformulation_max_tokens, + }, + } diff --git a/src/rag/semantic_pipeline.py b/src/rag/semantic_pipeline.py index 20f47aa..dc03543 100644 --- a/src/rag/semantic_pipeline.py +++ b/src/rag/semantic_pipeline.py @@ -5,13 +5,13 @@ import logging import time -from typing import Tuple, Dict, Any, Optional +from typing import Tuple, Dict, Any, Optional, List from ..config import get_config from ..storage.data_manager import get_data_manager from .tools.semantic_retrieval_tool import SemanticRetrievalTool from .tools.generation_tool import GenerationTool -from .query_processor import QueryProcessor +from .reformulation_service import ReformulationService, ConversationExchange class SemanticRAGPipeline: @@ -29,7 +29,7 @@ def __init__(self, use_semantic_features: bool = True): # Initialize tools self.semantic_retrieval_tool = SemanticRetrievalTool() self.generation_tool = GenerationTool() - self.query_processor = QueryProcessor() + self.reformulation_service = ReformulationService() self.logger.info("✅ Semantic RAG pipeline initialized") @@ -40,7 +40,6 @@ def process_query( user_id: str = "anonymous", conversation_id: Optional[str] = None, verbose: bool = False, - use_semantic_expansion: bool = True, use_semantic_reranking: bool = True, ) -> Tuple[str, str]: """ @@ -52,7 +51,6 @@ def process_query( user_id: User ID for logs conversation_id: Conversation ID for logs verbose: Detailed display - use_semantic_expansion: Enable semantic query expansion use_semantic_reranking: Enable semantic re-ranking Returns: @@ -64,30 +62,58 @@ def process_query( if verbose: self.logger.info(f"🔍 Processing query with semantic understanding: '{query[:100]}...'") - # Step 1: Process query for semantic understanding - query_result = None - if self.use_semantic_features and use_semantic_expansion: - if verbose: - self.logger.info("🧠 Step 1: Semantic query processing") + # Step 1: Query reformulation (new step) + reformulated_query = query + self.logger.info(f"🚀 PIPELINE START: Processing query: '{query}'") + self.logger.info( + f"📜 History provided: {bool(history and history.strip())} (length: {len(history) if history else 0})" + ) - query_result = self.query_processor.process_query(query) + # Debug output that should be visible in Streamlit logs + print(f"🚀 SEMANTIC PIPELINE: Processing '{query}'") + print(f"📜 History length: {len(history) if history else 0}") + if history and history.strip(): if verbose: - self.logger.info( - f"📝 Intent: {query_result.intent}, " - f"Variations: {len(query_result.expanded_queries)}, " - f"Confidence: {query_result.confidence:.2f}" - ) - - # Step 2: Semantic retrieval + self.logger.info("🔄 Step 1: Query reformulation") + + self.logger.info("📥 Extracting recent exchanges from history...") + # Extract recent exchanges from history + recent_exchanges = self._extract_exchanges_from_history(history) + self.logger.info(f"📚 Extracted {len(recent_exchanges)} exchanges from history") + + # Log the history format for debugging + self.logger.debug(f"📜 Raw history format: {repr(history[:200])}...") + + # Reformulate query to resolve coreferences + self.logger.info("🔄 Calling ReformulationService...") + print(f"🔄 CALLING REFORMULATION for: '{query}'") + reformulated_query = self.reformulation_service.reformulate_query(query, recent_exchanges) + print(f"🔄 REFORMULATION RESULT: '{reformulated_query}'") + + if reformulated_query != query: + self.logger.info(f"✅ QUERY REFORMULATED: '{query}' -> '{reformulated_query}'") + if verbose: + self.logger.info(f"📝 Query reformulated: '{query}' -> '{reformulated_query}'") + # For debugging - this should be visible in logs + print(f"🔄 REFORMULATION: '{query}' -> '{reformulated_query}'") + else: + self.logger.info(f"⚪ Query unchanged after reformulation: '{query}'") + else: + self.logger.info("⚪ No history provided - skipping reformulation step") + print("⚪ NO HISTORY - skipping reformulation") + + # Step 2: Document retrieval if verbose: - self.logger.info("📥 Step 2: Semantic document retrieval") + self.logger.info("📥 Step 2: Document retrieval") + # Use reformulated query for retrieval + print(f"🔍 VECTOR SEARCH: Using query '{reformulated_query}' for retrieval") search_results = self.semantic_retrieval_tool.retrieve( - query=query, - use_semantic_expansion=use_semantic_expansion and self.use_semantic_features, + query=reformulated_query, use_semantic_reranking=use_semantic_reranking and self.use_semantic_features, ) + print(f"📄 VECTOR SEARCH: Found {len(search_results)} results") if verbose: self.logger.info(f"📄 {len(search_results)} documents retrieved") @@ -100,7 +126,10 @@ def process_query( if verbose: self.logger.info("🤖 Step 3: Generating response") - generation_result = self.generation_tool.generate(query=query, documents=search_results, history=history) + # Use reformulated query for generation to ensure consistent filtering + generation_result = self.generation_tool.generate( + query=reformulated_query, documents=search_results, history="" + ) answer = generation_result["answer"] sources = generation_result["sources"] @@ -118,21 +147,11 @@ def process_query( "num_retrieved_docs": len(search_results), "generation_success": generation_result["success"], "semantic_features_enabled": self.use_semantic_features, - "semantic_expansion_used": use_semantic_expansion, "semantic_reranking_used": use_semantic_reranking, + "query_reformulated": reformulated_query != query, + "reformulated_query": reformulated_query if reformulated_query != query else None, } - # Add query processing metadata if available - if query_result: - metadata.update( - { - "query_intent": query_result.intent, - "query_confidence": query_result.confidence, - "num_query_variations": len(query_result.expanded_queries), - "semantic_keywords": query_result.keywords, - } - ) - self.data_manager.save_conversation( user_id=user_id, conversation_id=conv_id, @@ -142,6 +161,7 @@ def process_query( sources=self._format_sources_for_storage(search_results), metadata=metadata, ) + print(f"💾 SAVED CONVERSATION: '{query}' to conversation_id={conv_id}") except Exception as e: self.logger.warning(f"Failed to save conversation: {e}") @@ -180,35 +200,22 @@ def compare_with_basic_retrieval(self, query: str, k: int = 5) -> Dict[str, Any] Comparison results """ try: - # Semantic retrieval - semantic_results = self.semantic_retrieval_tool.retrieve( - query=query, k=k, use_semantic_expansion=True, use_semantic_reranking=True - ) - - # Basic retrieval (no semantic features) - basic_results = self.semantic_retrieval_tool.retrieve( - query=query, k=k, use_semantic_expansion=False, use_semantic_reranking=False - ) + # Retrieval with reranking + reranked_results = self.semantic_retrieval_tool.retrieve(query=query, k=k, use_semantic_reranking=True) - # Query processing info - query_result = self.query_processor.process_query(query) + # Basic retrieval (no reranking) + basic_results = self.semantic_retrieval_tool.retrieve(query=query, k=k, use_semantic_reranking=False) return { "query": query, - "query_processing": { - "intent": query_result.intent, - "confidence": query_result.confidence, - "keywords": query_result.keywords, - "expanded_queries": query_result.expanded_queries, - }, - "semantic_retrieval": { - "count": len(semantic_results), - "scores": [r.score for r in semantic_results], - "top_content": semantic_results[0].content[:200] + "..." if semantic_results else None, + "reranked_retrieval": { + "count": len(reranked_results), + "scores": [r.score for r in reranked_results], + "top_content": reranked_results[0].content[:200] + "..." if reranked_results else None, "avg_score": ( - sum(r.score for r in semantic_results) / len(semantic_results) if semantic_results else 0 + sum(r.score for r in reranked_results) / len(reranked_results) if reranked_results else 0 ), - "metadata_sample": semantic_results[0].metadata if semantic_results else None, + "metadata_sample": reranked_results[0].metadata if reranked_results else None, }, "basic_retrieval": { "count": len(basic_results), @@ -218,19 +225,19 @@ def compare_with_basic_retrieval(self, query: str, k: int = 5) -> Dict[str, Any] }, "improvement_metrics": { "score_improvement": ( - (semantic_results[0].score - basic_results[0].score) - if semantic_results and basic_results + (reranked_results[0].score - basic_results[0].score) + if reranked_results and basic_results else 0 ), "avg_score_improvement": ( - (sum(r.score for r in semantic_results) / len(semantic_results)) + (sum(r.score for r in reranked_results) / len(reranked_results)) - (sum(r.score for r in basic_results) / len(basic_results)) - if semantic_results and basic_results + if reranked_results and basic_results else 0 ), - "semantic_advantage": ( - semantic_results[0].score > basic_results[0].score - if semantic_results and basic_results + "reranking_advantage": ( + reranked_results[0].score > basic_results[0].score + if reranked_results and basic_results else False ), }, @@ -263,6 +270,51 @@ def _format_sources_for_storage(self, formatted_docs) -> list[dict]: return sources + def _extract_exchanges_from_history(self, history: str) -> List[ConversationExchange]: + """ + Extract conversation exchanges from history string. + + Args: + history: Formatted history string from format_chat_history + + Returns: + List of ConversationExchange objects + """ + exchanges = [] + + if not history or not history.strip(): + self.logger.debug("📭 Empty history provided") + return exchanges + + # Split history into lines and process + lines = [line.strip() for line in history.split("\n") if line.strip()] + self.logger.debug(f"📄 Processing {len(lines)} lines from history") + + current_user_msg = None + + for i, line in enumerate(lines): + self.logger.debug(f" Line {i + 1}: '{line[:100]}...'") + + if line.startswith("User: "): + current_user_msg = line[6:] # Remove 'User: ' + self.logger.debug(f" -> Found user message: '{current_user_msg}'") + elif line.startswith("Assistant: ") and current_user_msg: + assistant_msg = line[11:] # Remove 'Assistant: ' + exchange = ConversationExchange(user_message=current_user_msg, assistant_message=assistant_msg) + exchanges.append(exchange) + self.logger.debug( + f" -> Created exchange: User='{current_user_msg}' Assistant='{assistant_msg[:50]}...'" + ) + current_user_msg = None + + self.logger.info(f"📑 Extracted {len(exchanges)} exchanges total") + + # Return most recent exchanges (limit to avoid too much context) + final_exchanges = exchanges[-5:] if len(exchanges) > 5 else exchanges + self.logger.info(f"📋 Using {len(final_exchanges)} most recent exchanges") + + return final_exchanges + def is_ready(self) -> bool: """Check if the pipeline is ready""" try: @@ -295,7 +347,7 @@ def get_status(self) -> Dict[str, Any]: "generation_tool": generation_stats, "data_manager": self.data_manager.get_info(), "capabilities": { - "semantic_query_expansion": True, + "query_reformulation": True, "semantic_reranking": True, "intent_classification": True, "multilingual_support": True, diff --git a/src/rag/tools/generation_tool.py b/src/rag/tools/generation_tool.py index 83f9f2f..9df73a4 100644 --- a/src/rag/tools/generation_tool.py +++ b/src/rag/tools/generation_tool.py @@ -33,7 +33,9 @@ def generate( """ try: # Filter documents based on relevance + print(f"🔍 GENERATION: Received {len(documents)} documents for query '{query}'") relevant_documents = self._filter_relevant_documents(query, documents) + print(f"📄 GENERATION: After filtering, {len(relevant_documents)} documents remain") # Prepare context from relevant documents context = self._prepare_context(relevant_documents) @@ -127,7 +129,10 @@ def _call_openrouter(self, prompt: str) -> Dict[str, Any]: try: response = requests.post( - "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=30 + f"{self.config.openrouter_base_url}/chat/completions", + headers=headers, + json=payload, + timeout=self.config.openrouter_timeout, ) response.raise_for_status() @@ -179,7 +184,7 @@ def get_stats(self) -> Dict[str, Any]: def _filter_relevant_documents(self, query: str, documents: List[RetrievalDocument]) -> List[RetrievalDocument]: """ Filter documents based on relevance to the query. - Returns only documents that are truly relevant to avoid showing irrelevant sources. + Uses a more flexible approach to avoid missing important documents. """ if not documents: return [] @@ -234,9 +239,6 @@ def _filter_relevant_documents(self, query: str, documents: List[RetrievalDocume # For greetings and generic terms, don't show any sources return [] - # Calculate relevance scores - relevant_documents = [] - # Extract meaningful keywords from query (excluding stop words) stop_words = { "le", @@ -294,21 +296,76 @@ def _filter_relevant_documents(self, query: str, documents: List[RetrievalDocume if not query_keywords: return [] - # Calculate relevance for each document - for doc in documents: - relevance_score = self._calculate_document_relevance(query_keywords, doc) + # More flexible filtering approach + relevant_documents = [] + min_score_threshold = self.config.min_source_score_threshold + min_relevance_threshold = self.config.min_source_relevance_threshold - # Apply thresholds from configuration - min_score_threshold = self.config.min_source_score_threshold - min_relevance_threshold = self.config.min_source_relevance_threshold + print(f"🔍 FILTERING: Keywords extracted: {query_keywords}") - # Document is relevant if it meets both criteria - if doc.score >= min_score_threshold and relevance_score >= min_relevance_threshold: - relevant_documents.append(doc) + for i, doc in enumerate(documents): + relevance_score = self._calculate_document_relevance(query_keywords, doc) + + print(f"📄 DOC {i + 1}: score={doc.score:.3f}, relevance={relevance_score:.3f}") + + # Choose filtering approach based on configuration + if self.config.use_flexible_filtering: + # Flexible multi-criteria approach: document is relevant if it meets EITHER: + # 1. High vector similarity (even if keyword matching is poor) - semantic relevance + # 2. Good vector similarity AND decent keyword relevance - traditional approach + # 3. Exceptional keyword relevance (even if vector similarity is lower) - exact matches + + high_vector_threshold = min_score_threshold + 0.2 # e.g., 0.5 if min is 0.3 + exceptional_relevance_threshold = min_relevance_threshold + 0.3 # e.g., 0.5 if min is 0.2 + + is_highly_similar = doc.score >= high_vector_threshold + is_traditionally_relevant = ( + doc.score >= min_score_threshold and relevance_score >= min_relevance_threshold + ) + is_exceptionally_relevant = relevance_score >= exceptional_relevance_threshold + + print( + f" Flexible thresholds: high_vector={high_vector_threshold:.3f}, " + f"traditional=({min_score_threshold:.3f},{min_relevance_threshold:.3f}), " + f"exceptional={exceptional_relevance_threshold:.3f}" + ) + + if is_highly_similar: + relevant_documents.append(doc) + print(f" ✅ DOC {i + 1}: ACCEPTED (high vector similarity)") + elif is_traditionally_relevant: + relevant_documents.append(doc) + print(f" ✅ DOC {i + 1}: ACCEPTED (traditional criteria)") + elif is_exceptionally_relevant: + relevant_documents.append(doc) + print(f" ✅ DOC {i + 1}: ACCEPTED (exceptional keyword relevance)") + else: + print(f" ❌ DOC {i + 1}: REJECTED") + else: + # Traditional strict approach: both criteria must be met + print( + f" Traditional thresholds: vector>={min_score_threshold:.3f}, " + f"relevance>={min_relevance_threshold:.3f}" + ) + + if doc.score >= min_score_threshold and relevance_score >= min_relevance_threshold: + relevant_documents.append(doc) + print(f" ✅ DOC {i + 1}: ACCEPTED (traditional criteria)") + else: + print(f" ❌ DOC {i + 1}: REJECTED") # Sort by combined score (similarity + relevance) relevant_documents.sort(key=lambda doc: doc.score, reverse=True) + # Ensure we don't filter out all documents if we have good candidates + if not relevant_documents and documents: + # If no documents pass the flexible criteria, take the top scoring document + # This prevents scenarios where important information is completely filtered out + top_doc = max(documents, key=lambda d: d.score) + if top_doc.score >= (min_score_threshold - 0.1): # Slightly more lenient + relevant_documents = [top_doc] + print(f"🔄 FALLBACK: Taking top document with score {top_doc.score:.3f}") + # Log filtering results self.logger.debug(f"Filtered {len(documents)} documents to {len(relevant_documents)} relevant ones") @@ -317,6 +374,7 @@ def _filter_relevant_documents(self, query: str, documents: List[RetrievalDocume def _calculate_document_relevance(self, query_keywords: List[str], document: RetrievalDocument) -> float: """ Calculate how relevant a document is to the query keywords. + Enhanced with French language support and synonym matching. Returns a score between 0 and 1. """ if not query_keywords: @@ -326,13 +384,29 @@ def _calculate_document_relevance(self, query_keywords: List[str], document: Ret content = document.content.lower() title = document.metadata.get("title", "").lower() - # Count keyword matches - content_matches = sum(1 for keyword in query_keywords if keyword in content) - title_matches = sum(1 for keyword in query_keywords if keyword in title) - - # Calculate relevance score + # Enhanced keyword matching with French variations and common synonyms + content_matches = 0 + title_matches = 0 + + for keyword in query_keywords: + # Direct match + if keyword in content: + content_matches += 1 + if keyword in title: + title_matches += 1 + + # French variations and common synonyms + keyword_variations = self._get_keyword_variations(keyword) + for variation in keyword_variations: + if variation in content: + content_matches += 0.8 # Slightly lower weight for variations + if variation in title: + title_matches += 0.8 + + # Calculate base relevance score # Title matches are weighted higher than content matches - relevance_score = (title_matches * 2 + content_matches) / (len(query_keywords) * 2) + max_possible_matches = len(query_keywords) * 2 # 2 for title weight + relevance_score = (title_matches * 2 + content_matches) / max_possible_matches # Additional boost for exact phrase matches query_phrase = " ".join(query_keywords) @@ -342,4 +416,71 @@ def _calculate_document_relevance(self, query_keywords: List[str], document: Ret if query_phrase in title: relevance_score += 0.3 + # Boost for partial phrase matches (useful for reformulated queries) + if len(query_keywords) > 1: + for i in range(len(query_keywords) - 1): + partial_phrase = " ".join(query_keywords[i : i + 2]) + if partial_phrase in content: + relevance_score += 0.1 + if partial_phrase in title: + relevance_score += 0.15 + return min(1.0, relevance_score) + + def _get_keyword_variations(self, keyword: str) -> List[str]: + """ + Get variations of a keyword for better matching. + Handles French conjugations, plurals, and common synonyms. + """ + variations = [] + + # Common French and English synonyms/variations + synonym_map = { + # Team/people related + "équipe": ["team", "groupe", "collaborateurs", "membres"], + "team": ["équipe", "groupe", "collaborateurs", "membres"], + "collaborateurs": ["équipe", "team", "membres", "personnes"], + "membres": ["équipe", "team", "collaborateurs", "personnes"], + # Project related + "projet": ["project", "application", "app", "système"], + "project": ["projet", "application", "app", "système"], + "application": ["app", "projet", "project", "système"], + # Technical terms + "configuration": ["config", "paramètres", "settings", "setup"], + "installation": ["install", "setup", "configuration"], + "utilisation": ["usage", "use", "emploi"], + "fonctionnalités": ["features", "capacités", "options"], + "features": ["fonctionnalités", "capacités", "options"], + # Common verbs and their variations + "utiliser": ["use", "employer", "utilise", "utilisent"], + "installer": ["install", "installe", "installent", "setup"], + "configurer": ["configure", "config", "configure", "setup"], + } + + # Add direct synonyms + if keyword in synonym_map: + variations.extend(synonym_map[keyword]) + + # Add common French plural/singular variations + if keyword.endswith("s") and len(keyword) > 3: + variations.append(keyword[:-1]) # Remove 's' for singular + elif not keyword.endswith("s"): + variations.append(keyword + "s") # Add 's' for plural + + # Add common French verb conjugations + verb_endings = { + "er": ["e", "es", "ent", "ez", "ons"], # aimer -> aime, aimes, etc. + "ir": ["is", "it", "issent", "issez", "issons"], # finir -> finis, finit, etc. + "re": ["", "s", "ent", "ez", "ons"], # prendre -> prend, prends, etc. + } + + for ending, conjugations in verb_endings.items(): + if keyword.endswith(ending): + root = keyword[: -len(ending)] + for conj in conjugations: + variations.append(root + conj) + + # Remove duplicates and the original keyword + variations = list(set(v for v in variations if v != keyword and len(v) > 2)) + + return variations diff --git a/src/rag/tools/semantic_retrieval_tool.py b/src/rag/tools/semantic_retrieval_tool.py index 2eb6e10..665bb06 100644 --- a/src/rag/tools/semantic_retrieval_tool.py +++ b/src/rag/tools/semantic_retrieval_tool.py @@ -10,13 +10,12 @@ from ...embeddings import get_embedding_service from ...vectordb import VectorDBFactory from ...core.documents import RetrievalDocument -from ..query_processor import QueryProcessor, QueryProcessingResult class SemanticRetrievalTool: """ - Enhanced retrieval tool with semantic understanding capabilities. - Handles misleading keywords through query expansion and semantic re-ranking. + Modernized vector retrieval tool that works with reformulated queries. + Provides optional semantic re-ranking for improved relevance. """ def __init__(self): @@ -26,7 +25,6 @@ def __init__(self): # Centralized services - lazy loading self._embedding_service = None self._vector_db = None - self._query_processor = None self._initialized = False def _initialize(self): @@ -38,10 +36,9 @@ def _initialize(self): # Initialize services self._embedding_service = get_embedding_service() self._vector_db = VectorDBFactory.create_from_config() - self._query_processor = QueryProcessor() self._initialized = True - self.logger.debug("Semantic retrieval tool initialized") + self.logger.debug("Vector retrieval tool initialized") except Exception as e: raise RuntimeError(f"Failed to initialize semantic retrieval tool: {e}") @@ -51,163 +48,99 @@ def retrieve( query: str, k: Optional[int] = None, filter_conditions: Optional[Dict[str, Any]] = None, - use_semantic_expansion: bool = True, use_semantic_reranking: bool = True, ) -> List[RetrievalDocument]: """ - Retrieve relevant documents with semantic understanding. + Retrieve relevant documents using vector similarity. Args: query: User query k: Number of results to return filter_conditions: Filter conditions - use_semantic_expansion: Whether to use query expansion use_semantic_reranking: Whether to use semantic re-ranking Returns: - List of semantically relevant documents + List of relevant documents """ self._initialize() try: - # Process query for semantic understanding - if use_semantic_expansion: - query_result = self._query_processor.process_query(query) - self.logger.debug( - f"Query processed: intent={query_result.intent}, variations={len(query_result.expanded_queries)}" - ) - else: - query_result = QueryProcessingResult( - original_query=query, - expanded_queries=[query], - intent="general", - keywords=query.split(), - semantic_variations=[], - confidence=0.5, - ) + # Debug output to verify what query is received + print(f"🔍 RETRIEVAL TOOL: Received query '{query}'") - # Retrieve documents using multi-query approach - all_results = self._multi_query_retrieval(query_result, filter_conditions) + all_results = self._direct_vector_retrieval(query, filter_conditions) - # Apply semantic re-ranking if enabled if use_semantic_reranking and len(all_results) > 1: - all_results = self._semantic_rerank(query_result, all_results) + all_results = self._semantic_rerank_simple(query, all_results) - # Determine number of results to return return_k = k if k is not None else self.config.search_k final_results = all_results[:return_k] - self.logger.debug(f"Semantic retrieval: {len(final_results)} results for '{query[:50]}...'") + self.logger.debug(f"Vector retrieval: {len(final_results)} results for '{query[:50]}...'") return final_results except Exception as e: - self.logger.error(f"Semantic retrieval failed: {e}") + self.logger.error(f"Vector retrieval failed: {e}") # Fallback to basic retrieval return self._basic_retrieval(query, k, filter_conditions) - def _multi_query_retrieval( - self, query_result: QueryProcessingResult, filter_conditions: Optional[Dict[str, Any]] = None + def _direct_vector_retrieval( + self, query: str, filter_conditions: Optional[Dict[str, Any]] = None ) -> List[RetrievalDocument]: """ - Perform retrieval using multiple query variations and merge results. + Perform direct vector retrieval. """ - all_candidates = [] - seen_content = set() - - # Weight queries based on their origin - query_weights = { - query_result.original_query: 1.0, # Original query has highest weight - } - - # Lower weights for expanded queries - for i, expanded_query in enumerate(query_result.expanded_queries[1:], 1): - query_weights[expanded_query] = max(0.3, 1.0 - (i * 0.1)) - - # Retrieve for each query variation - for query_text in query_result.expanded_queries: - try: - # Generate embedding for this query variation - query_embedding = self._embedding_service.encode_query(query_text) - - # Search in vector database - search_results = self._vector_db.search( - query_embedding=query_embedding, k=self.config.search_fetch_k, filter_conditions=filter_conditions - ) - - # Convert to retrieval documents with weighted scores - query_weight = query_weights.get(query_text, 0.5) - for result in search_results: - # Avoid duplicates based on content - content_key = result.document.content[:200] # First 200 chars as key - if content_key not in seen_content: - seen_content.add(content_key) - - # Adjust score based on query weight - weighted_score = result.score * query_weight - - retrieval_doc = RetrievalDocument( - content=result.document.content, - metadata=result.document.metadata or {}, - score=weighted_score, - ) + try: + query_embedding = self._embedding_service.encode_query(query) - # Add query information to metadata - retrieval_doc.metadata["matched_query"] = query_text - retrieval_doc.metadata["query_weight"] = query_weight - retrieval_doc.metadata["original_score"] = result.score + search_results = self._vector_db.search( + query_embedding=query_embedding, k=self.config.search_fetch_k, filter_conditions=filter_conditions + ) - all_candidates.append(retrieval_doc) + retrieval_docs = [] + for result in search_results: + retrieval_doc = RetrievalDocument( + content=result.document.content, + metadata=result.document.metadata or {}, + score=result.score, + ) + retrieval_docs.append(retrieval_doc) - except Exception as e: - self.logger.warning(f"Failed to retrieve for query '{query_text}': {e}") - continue + retrieval_docs.sort(key=lambda x: x.score, reverse=True) - # Sort by weighted score - all_candidates.sort(key=lambda x: x.score, reverse=True) + self.logger.debug(f"Vector retrieval: {len(retrieval_docs)} documents found") + return retrieval_docs - # Return top candidates (more than final k to allow for re-ranking) - max_candidates = min(len(all_candidates), self.config.search_fetch_k * 2) - return all_candidates[:max_candidates] + except Exception as e: + self.logger.error(f"Vector retrieval failed: {e}") + return [] - def _semantic_rerank( - self, query_result: QueryProcessingResult, candidates: List[RetrievalDocument] - ) -> List[RetrievalDocument]: + def _semantic_rerank_simple(self, query: str, candidates: List[RetrievalDocument]) -> List[RetrievalDocument]: """ - Re-rank candidates using semantic similarity and intent matching. + Semantic re-ranking based on content similarity with the query. """ if not candidates: return candidates try: - # Generate embedding for original query - original_query_embedding = self._embedding_service.encode_single(query_result.original_query) + query_embedding = self._embedding_service.encode_single(query) - # Calculate semantic scores for each candidate for candidate in candidates: - # Semantic similarity with original query content_embedding = self._embedding_service.encode_single(candidate.content) - semantic_score = self._embedding_service.similarity(original_query_embedding, content_embedding) - - # Intent matching bonus - intent_bonus = self._calculate_intent_bonus(query_result.intent, candidate) + semantic_score = self._embedding_service.similarity(query_embedding, content_embedding) - # Keyword matching bonus - keyword_bonus = self._calculate_keyword_bonus(query_result.keywords, candidate) + query_keywords = [word.lower() for word in query.split() if len(word) > 2] + content_lower = candidate.content.lower() + keyword_matches = sum(1 for keyword in query_keywords if keyword in content_lower) + keyword_bonus = min(0.3, keyword_matches / len(query_keywords)) if query_keywords else 0 - # Combine scores - # 60% original score, 25% semantic similarity, 10% intent, 5% keywords - combined_score = ( - 0.6 * candidate.score + 0.25 * semantic_score + 0.1 * intent_bonus + 0.05 * keyword_bonus - ) + combined_score = 0.7 * candidate.score + 0.2 * semantic_score + 0.1 * keyword_bonus - # Update candidate score and add debugging info candidate.score = combined_score candidate.metadata["semantic_score"] = semantic_score - candidate.metadata["intent_bonus"] = intent_bonus candidate.metadata["keyword_bonus"] = keyword_bonus candidate.metadata["combined_score"] = combined_score - # Sort by combined score candidates.sort(key=lambda x: x.score, reverse=True) self.logger.debug(f"Semantic re-ranking applied to {len(candidates)} candidates") @@ -217,46 +150,6 @@ def _semantic_rerank( self.logger.error(f"Semantic re-ranking failed: {e}") return candidates - def _calculate_intent_bonus(self, intent: str, candidate: RetrievalDocument) -> float: - """Calculate bonus score based on intent matching""" - content_lower = candidate.content.lower() - - intent_keywords = { - "team_info": [ - "équipe", - "team", - "collaborateurs", - "membres", - "vincent", - "nicolas", - "emin", - "fraillon", - "lambropoulos", - "calyaka", - "composition", - "responsabilités", - ], - "project_info": ["projet", "isschat", "application", "description", "objectif", "but"], - "technical_info": ["configuration", "installation", "utilisation", "problème", "erreur"], - "feature_info": ["fonctionnalités", "features", "capacités", "options", "paramètres"], - } - - if intent in intent_keywords: - keywords = intent_keywords[intent] - matches = sum(1 for keyword in keywords if keyword in content_lower) - return min(1.0, matches / len(keywords)) - - return 0.0 - - def _calculate_keyword_bonus(self, keywords: List[str], candidate: RetrievalDocument) -> float: - """Calculate bonus score based on keyword matching""" - if not keywords: - return 0.0 - - content_lower = candidate.content.lower() - matches = sum(1 for keyword in keywords if keyword in content_lower) - return min(1.0, matches / len(keywords)) - def _basic_retrieval( self, query: str, k: Optional[int] = None, filter_conditions: Optional[Dict[str, Any]] = None ) -> List[RetrievalDocument]: @@ -299,7 +192,7 @@ def get_stats(self) -> Dict[str, Any]: embedding_info = self._embedding_service.get_info() return { - "type": "semantic_retrieval_tool", + "type": "vector_retrieval_tool", "ready": self.is_ready(), "config": { "search_k": self.config.search_k, @@ -310,53 +203,52 @@ def get_stats(self) -> Dict[str, Any]: "vector_db": db_info, "embedding_service": embedding_info, "features": { - "semantic_expansion": True, + "direct_vector_retrieval": True, "semantic_reranking": True, - "intent_classification": True, - "multi_query_retrieval": True, + "query_reformulation_compatible": True, + "french_language_support": True, }, } except Exception as e: - return {"type": "semantic_retrieval_tool", "ready": False, "error": str(e)} + return {"type": "vector_retrieval_tool", "ready": False, "error": str(e)} - def test_semantic_retrieval(self, test_query: str = "qui sont les collaborateurs sur Isschat") -> Dict[str, Any]: - """Test semantic retrieval with the specific problematic query""" + def test_vector_retrieval(self, test_query: str = "qui sont les collaborateurs sur Isschat") -> Dict[str, Any]: + """Test vector retrieval with optional semantic re-ranking""" try: self._initialize() if not self.is_ready(): return {"success": False, "error": "Vector DB empty or not accessible"} - # Test with semantic features enabled - semantic_results = self.retrieve(test_query, k=5, use_semantic_expansion=True, use_semantic_reranking=True) + # Test with semantic re-ranking enabled + reranked_results = self.retrieve(test_query, k=5, use_semantic_reranking=True) - # Test with semantic features disabled (basic retrieval) - basic_results = self.retrieve(test_query, k=5, use_semantic_expansion=False, use_semantic_reranking=False) + # Test with semantic re-ranking disabled (direct vector retrieval) + direct_results = self.retrieve(test_query, k=5, use_semantic_reranking=False) return { "success": True, "query": test_query, - "semantic_results": { - "count": len(semantic_results), - "scores": [r.score for r in semantic_results], - "sample_content": semantic_results[0].content[:200] + "..." if semantic_results else None, - "matched_queries": [r.metadata.get("matched_query") for r in semantic_results[:3]], + "reranked_results": { + "count": len(reranked_results), + "scores": [r.score for r in reranked_results], + "sample_content": reranked_results[0].content[:200] + "..." if reranked_results else None, }, - "basic_results": { - "count": len(basic_results), - "scores": [r.score for r in basic_results], - "sample_content": basic_results[0].content[:200] + "..." if basic_results else None, + "direct_results": { + "count": len(direct_results), + "scores": [r.score for r in direct_results], + "sample_content": direct_results[0].content[:200] + "..." if direct_results else None, }, "improvement": { "score_improvement": ( - (semantic_results[0].score - basic_results[0].score) - if semantic_results and basic_results + (reranked_results[0].score - direct_results[0].score) + if reranked_results and direct_results else 0 ), - "semantic_features_help": ( - len(semantic_results) > 0 - and (not basic_results or semantic_results[0].score > basic_results[0].score) + "reranking_helps": ( + len(reranked_results) > 0 + and (not direct_results or reranked_results[0].score > direct_results[0].score) ), }, } diff --git a/src/webapp/app.py b/src/webapp/app.py index d25520c..1fda614 100644 --- a/src/webapp/app.py +++ b/src/webapp/app.py @@ -6,8 +6,7 @@ import asyncio from pathlib import Path import traceback -import pandas as pd -from datetime import datetime, timedelta +from datetime import datetime import uuid from typing import Optional @@ -329,14 +328,15 @@ def chat_page(): ] # Helper to format chat history for prompt - def format_chat_history(conversation_id: str, max_turns=10): + def format_chat_history(conversation_id: str): from src.storage.data_manager import get_data_manager data_manager = get_data_manager() - # Fetch entries for the current conversation_id - conversation_entries = data_manager.get_conversation_history( - conversation_id=conversation_id, limit=max_turns * 2 - ) + # Fetch entries for the current conversation_id (no artificial limit) + conversation_entries = data_manager.get_conversation_history(conversation_id=conversation_id) + + # Debug output + print(f"📚 HISTORY: Found {len(conversation_entries)} entries for conversation {conversation_id}") conversation_entries.sort(key=lambda x: x.get("timestamp", "")) @@ -345,7 +345,11 @@ def format_chat_history(conversation_id: str, max_turns=10): if entry.get("question") and entry.get("answer"): history.append(f"User: {entry['question']}") history.append(f"Assistant: {entry['answer']}") - return "\n".join(history) + print(f"📝 HISTORY ENTRY: User='{entry['question']}' Assistant='{entry['answer'][:50]}...'") + + formatted_history = "\n".join(history) + print(f"📄 FORMATTED HISTORY ({len(formatted_history)} chars): {repr(formatted_history[:200])}...") + return formatted_history # Display message history with feedback widgets for i, msg in enumerate(st.session_state.messages): @@ -408,6 +412,7 @@ def format_chat_history(conversation_id: str, max_turns=10): st.chat_message("user", avatar=IMAGES["user"]).write(prompt) # Prepare chat history for context from the data manager + # Note: History is now used for query reformulation within the pipeline, not for generation chat_history = format_chat_history(st.session_state["current_conversation_id"]) # Process the question with all features @@ -474,6 +479,7 @@ def handle_prompt_click(prompt_text): st.chat_message("user", avatar=IMAGES["user"]).write(prompt) # Prepare chat history for context from the data manager + # Note: History is now used for query reformulation within the pipeline, not for generation chat_history = format_chat_history(st.session_state["current_conversation_id"]) # Process the question with all features @@ -622,149 +628,7 @@ def dashboard_page(): st.rerun() -def render_performance_tracking(): - """Render performance tracking section""" - # Period selection - days = st.slider("Analysis period (days)", 1, 30, 7, key="performance_days") - - try: - # Get real performance data - perf_data = get_real_performance_data() - - if perf_data: - # Display main metrics - col1, col2, col3 = st.columns(3) - with col1: - st.metric("Average Response Time", f"{perf_data.get('avg_response_time', 0):.0f} ms") - with col2: - st.metric("Total Conversations", perf_data.get("total_conversations", 0)) - with col3: - st.metric("Conversations Today", perf_data.get("conversations_today", 0)) - - # Get conversation data for charts - from src.storage.data_manager import get_data_manager - - data_manager = get_data_manager() - conversations = data_manager.get_conversation_history(limit=200) - - if conversations: - # Convert to DataFrame for analysis - df = pd.DataFrame(conversations) - df["timestamp"] = pd.to_datetime(df["timestamp"]) - df["date"] = df["timestamp"].dt.date - - # Filter by selected period - cutoff_date = datetime.now() - timedelta(days=days) - recent_df = df[df["timestamp"] >= cutoff_date] - - if not recent_df.empty: - # Daily conversation count - daily_counts = recent_df.groupby("date").size().reset_index(name="count") - st.subheader("Daily Conversation Volume") - st.bar_chart(daily_counts.set_index("date")["count"]) - - # Response time evolution - if "response_time_ms" in recent_df.columns: - daily_response_times = recent_df.groupby("date")["response_time_ms"].mean().reset_index() - st.subheader("Average Response Time Evolution") - st.line_chart(daily_response_times.set_index("date")["response_time_ms"]) - else: - st.info(f"No conversations found in the last {days} days") - else: - st.info("No conversation data available") - - else: - st.warning("No performance data available for the selected period") - - except Exception as e: - st.error(f"Error displaying performance metrics: {str(e)}") - - -def render_conversation_analysis(): - """Render conversation analysis section""" - try: - # Get real conversation data - from src.storage.data_manager import get_data_manager - - data_manager = get_data_manager() - conversations = data_manager.get_conversation_history(limit=200) - - if conversations: - # Display basic metrics - col1, col2, col3 = st.columns(3) - with col1: - st.metric("Total Conversations", len(conversations)) - with col2: - avg_response_time = sum(c.get("response_time_ms", 0) for c in conversations) / len(conversations) - st.metric("Average Response Time", f"{avg_response_time:.0f}ms") - with col3: - avg_length = sum(c.get("answer_length", 0) for c in conversations) / len(conversations) - st.metric("Average Answer Length", f"{avg_length:.0f} chars") - - # Convert to DataFrame for analysis - df = pd.DataFrame(conversations) - df["timestamp"] = pd.to_datetime(df["timestamp"]) - df["hour"] = df["timestamp"].dt.hour - - # Hourly distribution - st.subheader("Question Distribution by Hour") - hourly_counts = df.groupby("hour").size().reset_index(name="count") - - # Fill missing hours with 0 - all_hours = pd.DataFrame({"hour": range(24)}) - hourly_counts = all_hours.merge(hourly_counts, on="hour", how="left").fillna(0) - - st.bar_chart(hourly_counts.set_index("hour")["count"]) - - # User activity if multiple users - unique_users = df["user_id"].nunique() - if unique_users > 1: - st.subheader("User Activity") - user_counts = df["user_id"].value_counts().head(10) - st.bar_chart(user_counts) - else: - st.info("No conversation data available") - - except Exception as e: - st.error(f"Error displaying conversation analysis: {str(e)}") - - -def render_general_statistics(): - """Render general statistics section""" - try: - # Get statistics from various sources - if "features_manager" in st.session_state and st.session_state["features_manager"]: - features = st.session_state["features_manager"] - - # Get feedback statistics - feedback_stats = features.get_feedback_statistics(days=30) - - # Display statistics in columns - col1, col2, col3 = st.columns(3) - - with col1: - st.metric("Total conversations", feedback_stats.get("total_feedback", 0)) - - with col2: - st.metric("Average response time", "1200 ms") # Sample data - - with col3: - st.metric("Satisfaction rate", f"{feedback_stats.get('satisfaction_rate', 0):.1f}%") - - # Additional metrics - col4, col5 = st.columns(2) - - with col4: - st.metric("Positive feedback", feedback_stats.get("positive_feedback", 0)) - - with col5: - st.metric("Negative feedback", feedback_stats.get("negative_feedback", 0)) - - else: - st.warning("Statistics not available") - - except Exception as e: - st.error(f"Error collecting statistics: {str(e)}") +# Note: Unused dashboard functions removed - functionality moved to components/performance_dashboard.py @admin_required