From 85df4af0a8a37e034588c2049a87b2fb67ea961c Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Thu, 6 Nov 2025 23:15:05 -0500 Subject: [PATCH 1/4] Refactor Neo4j knowledge query pipeline --- README.md | 4 +- README_CN.md | 4 +- docs/api/rest.md | 47 +- docs/architecture/components.md | 40 +- docs/architecture/dataflow.md | 269 ++--- docs/guide/knowledge/query.md | 39 +- src/codebase_rag/api/routes.py | 14 +- .../knowledge/neo4j_knowledge_service.py | 993 ++++++++++++------ 8 files changed, 852 insertions(+), 558 deletions(-) diff --git a/README.md b/README.md index 29f79b3..6e1a830 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,9 @@ response = httpx.post("http://localhost:8000/api/v1/documents/directory", json={ # Query the knowledge base response = httpx.post("http://localhost:8000/api/v1/knowledge/query", json={ "question": "How does the authentication system work?", - "mode": "hybrid" # or "graph_only", "vector_only" + "mode": "hybrid", # or "graph_only", "vector_only" + "use_tools": False, + "top_k": 5 }) # Search similar documents diff --git a/README_CN.md b/README_CN.md index 9638322..f72e9b1 100644 --- a/README_CN.md +++ b/README_CN.md @@ -144,7 +144,9 @@ response = httpx.post("http://localhost:8000/api/v1/documents/directory", json={ # 查询知识库 response = httpx.post("http://localhost:8000/api/v1/knowledge/query", json={ "question": "认证系统是如何工作的?", - "mode": "hybrid" # 或 "graph_only", "vector_only" + "mode": "hybrid", # 或 "graph_only", "vector_only" + "use_tools": False, + "top_k": 5 }) # 搜索相似文档 diff --git a/docs/api/rest.md b/docs/api/rest.md index 3184249..3bc391a 100644 --- a/docs/api/rest.md +++ b/docs/api/rest.md @@ -135,16 +135,26 @@ Query the knowledge base using GraphRAG. ```json { "question": "How does authentication work in this system?", - "mode": "hybrid" + "mode": "hybrid", + "use_graph": true, + "use_vector": true, + "use_tools": false, + "top_k": 5, + "graph_depth": 2 } ``` **Parameters**: - `question` (string, required): Question to ask - `mode` (string, optional): Query mode - - `hybrid` (default): Graph traversal + vector search - - `graph_only`: Only graph relationships - - `vector_only`: Only vector similarity + - `hybrid` (default): Run graph + vector retrieval sequentially + - `graph_only`: Only run graph retrieval + - `vector_only`: Only run vector retrieval +- `use_graph` / `use_vector` (boolean, optional): Override mode defaults +- `use_tools` (boolean, optional): Execute registered workflow tools (default: `false`) +- `top_k` (integer, optional): Override vector retrieval `top_k` (default: global setting) +- `graph_depth` (integer, optional): Override graph traversal depth (default: `2`) +- `tool_kwargs` (object, optional): Extra parameters passed to workflow tools **Response**: ```json @@ -153,6 +163,7 @@ Query the knowledge base using GraphRAG. "answer": "The system uses JWT-based authentication...", "source_nodes": [ { + "node_id": "node-123", "text": "JWT implementation details...", "score": 0.92, "metadata": { @@ -161,7 +172,33 @@ Query the knowledge base using GraphRAG. } } ], - "mode": "hybrid" + "retrieved_nodes": [...], + "pipeline_steps": [ + { + "step": "graph_retrieval", + "node_count": 3, + "config": { + "graph_traversal_depth": 2, + "max_knowledge_sequence": 30 + } + }, + { + "step": "vector_retrieval", + "node_count": 5, + "config": { + "top_k": 5 + } + } + ], + "tool_outputs": [], + "query_mode": "hybrid", + "config": { + "graph": true, + "vector": true, + "tools": false, + "top_k": 5, + "graph_depth": 2 + } } ``` diff --git a/docs/architecture/components.md b/docs/architecture/components.md index 8bdafb8..295fc5f 100644 --- a/docs/architecture/components.md +++ b/docs/architecture/components.md @@ -367,9 +367,14 @@ async def lifespan(app: FastAPI): ```python class Neo4jKnowledgeService: def __init__(self): - self.graph_store = None # Neo4j graph store - self.knowledge_index = None # LlamaIndex KnowledgeGraphIndex - self.query_engine = None # RAG query engine + self.graph_store = None # Neo4j graph store + self.storage_context = None # Shared storage context + self.knowledge_index = None # KnowledgeGraphIndex + self.vector_index = None # VectorStoreIndex for similarity search + self.response_synthesizer = None # LLM-backed synthesizer + self.query_pipeline = None # Graph/Vector pipeline + self.function_tools = [] # Workflow tools + self.tool_node = None # Optional ToolNode self._initialized = False ``` @@ -389,7 +394,7 @@ sequenceDiagram KnowServ->>LlamaIndex: Configure Settings KnowServ->>LlamaIndex: Create KnowledgeGraphIndex LlamaIndex-->>KnowServ: Index ready - KnowServ->>KnowServ: Create query engine + KnowServ->>KnowServ: Build QueryPipeline (graph + vector + synth) KnowServ-->>Client: Initialized ``` @@ -419,24 +424,29 @@ async def add_document( async def query( self, question: str, - top_k: int = 5 + *, + mode: str = "hybrid", + use_tools: bool = False ) -> Dict[str, Any]: - """Query knowledge base with RAG""" - # 1. Use query engine to retrieve relevant context - # 2. Generate answer using LLM with context - response = await asyncio.to_thread( - self.query_engine.query, - question - ) + """Run the QueryPipeline composed of graph/vector retrievers and a synthesizer.""" + config = self._resolve_pipeline_config(mode, use_tools=use_tools) + result = await asyncio.to_thread(self.query_pipeline.run, question, config) - # 3. Return answer with source nodes return { "success": True, - "answer": str(response), - "sources": [node.metadata for node in response.source_nodes] + "answer": str(result["response"]), + "source_nodes": format_sources(result["source_nodes"]), + "pipeline_steps": result["steps"], + "tool_outputs": result["tool_outputs"] } ``` +**Pipeline Components**: +1. `KnowledgeGraphRAGRetriever` — extracts entities and traverses the property graph. +2. `VectorIndexRetriever` — performs vector similarity search over the Neo4j vector index. +3. `ResponseSynthesizer` — merges retrieved context and generates the final answer. +4. `FunctionTool` / `ToolNode` (optional) — exposes the query as a workflow tool for multi-turn agents. + #### 3. Semantic Search ```python async def search_similar( diff --git a/docs/architecture/dataflow.md b/docs/architecture/dataflow.md index e4ce465..adb0316 100644 --- a/docs/architecture/dataflow.md +++ b/docs/architecture/dataflow.md @@ -361,252 +361,111 @@ except TimeoutError as e: ### Overview -Query processing combines vector search, graph traversal, and LLM generation: +The knowledge query path is orchestrated by a `Neo4jRAGPipeline` that is constructed during service initialization. The pipeline runs a deterministic sequence of components: + +1. Resolve the query configuration (mode, graph/vector toggles, tool options). +2. Run `KnowledgeGraphRAGRetriever` for structured graph traversal. +3. Run `VectorIndexRetriever` against the Neo4j vector index. +4. Merge and deduplicate nodes before handing them to the `ResponseSynthesizer`. +5. Optionally execute registered `FunctionTool`/`ToolNode` hooks. +6. Return the synthesized answer together with pipeline traces and sources. ```mermaid -graph TB - Query([User asks question]) --> Embed[Generate Query Embedding] - Embed --> VectorSearch[Vector Similarity Search] - VectorSearch --> TopK[Retrieve Top-K Chunks] - TopK --> GraphExpand[Expand via Graph Relationships] - GraphExpand --> Rerank[Rerank by Relevance] - Rerank --> BuildContext[Build Context Window] - BuildContext --> LLMPrompt[Create LLM Prompt] - LLMPrompt --> LLMGenerate[LLM Generate Answer] - LLMGenerate --> PostProcess[Post-process Response] - PostProcess --> Response([Return Answer + Sources]) - - style VectorSearch fill:#E6F3FF - style GraphExpand fill:#FFF4E6 - style LLMGenerate fill:#F0E6FF +graph TD + Q[User Question] --> Config[Resolve Pipeline Config] + Config --> Graph[KnowledgeGraphRAGRetriever] + Config --> Vector[VectorIndexRetriever] + Graph --> Merge[Merge & Deduplicate] + Vector --> Merge + Merge --> Synthesize[ResponseSynthesizer] + Synthesize --> Tools{use_tools?} + Tools -->|yes| Execute[FunctionTool / ToolNode] + Tools -->|no| Response([API Response]) + Execute --> Response ``` ### Step-by-Step Flow -#### 1. Query Embedding +#### 1. Pipeline Configuration **API Endpoint**: `POST /api/v1/knowledge/query` -```python -# Request +```json { "question": "How does authentication work in the system?", - "top_k": 5 + "mode": "hybrid", + "use_graph": true, + "use_vector": true, + "use_tools": false, + "top_k": 5, + "graph_depth": 2 } ``` -**Generate embedding**: ```python -query_embedding = await embed_model.get_query_embedding(question) -# Same embedding model as documents for consistency -``` - -#### 2. Vector Similarity Search - -**Neo4j Vector Query**: - -```cypher -CALL db.index.vector.queryNodes( - 'knowledge_vectors', // Index name - $top_k, // Number of results - $query_embedding // Query vector +config = service._resolve_pipeline_config( + mode=payload.get("mode", "hybrid"), + use_graph=payload.get("use_graph"), + use_vector=payload.get("use_vector"), + use_tools=payload.get("use_tools", False), + top_k=payload.get("top_k"), + graph_depth=payload.get("graph_depth"), ) -YIELD node, score - -MATCH (node)-[:BELONGS_TO]->(doc:Document) -RETURN node.text as text, - doc.title as source, - score as similarity -ORDER BY score DESC -``` - -**Result**: -```python -[ - { - "text": "The system uses JWT tokens for authentication...", - "source": "Authentication Guide", - "similarity": 0.89 - }, - { - "text": "Users authenticate via POST /api/auth/login...", - "source": "API Documentation", - "similarity": 0.84 - }, - # ... more results -] -``` - -#### 3. Graph-Based Expansion - -**Expand context via relationships**: - -```cypher -// Get related chunks via entity relationships -MATCH (chunk:Chunk)-[:MENTIONS]->(e:Entity)<-[:MENTIONS]-(related:Chunk) -WHERE chunk.id IN $initial_chunk_ids - AND related.id NOT IN $initial_chunk_ids -RETURN related.text as text, - COUNT(*) as connection_strength -ORDER BY connection_strength DESC -LIMIT 3 ``` -**Why expand?** -- Capture related context -- Find transitively related information -- Improve answer completeness - -#### 4. Result Reranking +#### 2. Graph Retrieval -**Combine scores**: +`KnowledgeGraphRAGRetriever` performs entity extraction and multi-hop traversal to fetch relevant triples and document chunks. Depth defaults to `2` but can be overridden per request. ```python -def rerank_results(results: List[dict]) -> List[dict]: - """Rerank by multiple factors""" - for result in results: - score = ( - result['similarity'] * 0.6 + # Vector similarity - result['graph_score'] * 0.2 + # Graph connectivity - result['recency_score'] * 0.1 + # Document age - result['metadata_match'] * 0.1 # Metadata relevance - ) - result['final_score'] = score - - return sorted(results, key=lambda x: x['final_score'], reverse=True) +graph_nodes = KnowledgeGraphRAGRetriever( + storage_context=storage_context, + llm=Settings.llm, + graph_traversal_depth=config.graph_depth, +).retrieve(QueryBundle(question)) ``` -#### 5. Context Window Building +#### 3. Vector Retrieval -**Create prompt context**: +`VectorIndexRetriever` queries the Neo4j vector index (loaded through `VectorStoreIndex.from_vector_store`) to find semantically similar chunks. ```python -def build_context(chunks: List[dict], max_tokens: int = 2000) -> str: - """Build context staying within token limit""" - context_parts = [] - total_tokens = 0 - - for chunk in chunks: - chunk_tokens = estimate_tokens(chunk['text']) - if total_tokens + chunk_tokens > max_tokens: - break - - context_parts.append(f"[Source: {chunk['source']}]\n{chunk['text']}") - total_tokens += chunk_tokens - - return "\n\n".join(context_parts) +vector_nodes = VectorIndexRetriever( + vector_index, + similarity_top_k=config.top_k, +).retrieve(QueryBundle(question)) ``` -#### 6. LLM Prompt Construction - -**Prompt Template**: +#### 4. Response Synthesis -```python -prompt = f"""You are a helpful assistant. Answer the question based on the provided context. - -Context: -{context} - -Question: {question} - -Answer: Provide a comprehensive answer based on the context above. If the context doesn't contain enough information to fully answer the question, say so. -""" -``` +Retrieved nodes are merged and passed to the `ResponseSynthesizer`, which uses the configured LLM to generate the final response while keeping provenance. -**Advanced Prompting**: ```python -# With instructions -prompt = f"""You are a technical documentation expert. - -Context from knowledge base: -{context} - -User question: {question} - -Instructions: -1. Answer based on the provided context -2. Cite sources when possible -3. If information is incomplete, state what's missing -4. Use technical accuracy - -Answer:""" +response = response_synthesizer.synthesize(QueryBundle(question), merged_nodes) ``` -#### 7. LLM Generation - -**Call LLM**: - -```python -# OpenAI example -response = await llm.acomplete(prompt) -answer = response.text - -# Ollama example -response = await llm.acomplete(prompt) -answer = response.text -``` +#### 5. Optional Tool Execution -**Streaming Support**: -```python -async for token in llm.astream_complete(prompt): - yield token.text # Stream to client -``` +If `use_tools` is `true`, the pipeline invokes registered tools to extend the answer (for example, to run follow-up analytics). Tool execution results are attached to the API response. -#### 8. Response Assembly +#### 6. Response Packaging -**Final Response**: +The API returns the synthesized answer along with full pipeline traces: -```python +```json { - "success": true, - "answer": "The system uses JWT token-based authentication...", - "sources": [ - { - "title": "Authentication Guide", - "relevance": 0.89, - "excerpt": "JWT tokens are used..." - }, - { - "title": "API Documentation", - "relevance": 0.84, - "excerpt": "Login endpoint returns..." - } - ], - "metadata": { - "chunks_retrieved": 5, - "chunks_used": 3, - "processing_time": 1.2, - "model": "gpt-3.5-turbo" - } + "answer": "The system uses JWT-based authentication...", + "source_nodes": [...], + "retrieved_nodes": [...], + "pipeline_steps": [ + {"step": "graph_retrieval", "node_count": 3}, + {"step": "vector_retrieval", "node_count": 5} + ], + "tool_outputs": [], + "config": {"graph": true, "vector": true, "tools": false} } ``` -### Performance Optimization - -**Caching**: -```python -# Cache query embeddings -@lru_cache(maxsize=1000) -async def get_cached_embedding(query: str) -> List[float]: - return await embed_model.get_query_embedding(query) - -# Cache common queries -query_cache = {} -cache_key = f"{question}:{top_k}" -if cache_key in query_cache: - return query_cache[cache_key] -``` - -**Parallel Processing**: -```python -# Parallel embedding and metadata lookup -embedding_task = asyncio.create_task(generate_embedding(question)) -metadata_task = asyncio.create_task(get_metadata_filters(question)) - -embedding = await embedding_task -metadata = await metadata_task -``` - ## Memory Management Pipeline ### Overview diff --git a/docs/guide/knowledge/query.md b/docs/guide/knowledge/query.md index bd03155..1a5cefb 100644 --- a/docs/guide/knowledge/query.md +++ b/docs/guide/knowledge/query.md @@ -229,6 +229,17 @@ Control the number of source documents retrieved: **Default**: `TOP_K=5` (from `.env` configuration) +### Pipeline Step Mapping + +| Mode / Flags | Graph Retrieval | Vector Retrieval | Tool Hooks | +| --- | --- | --- | --- | +| `hybrid` (default) | ✅ | ✅ | Optional (`use_tools`) | +| `graph_only` | ✅ | ❌ | Optional (`use_tools`) | +| `vector_only` | ❌ | ✅ | Optional (`use_tools`) | +| Custom (`use_graph` / `use_vector`) | As configured | As configured | Optional (`use_tools`) | + +When `use_tools` is set to `true`, the service invokes registered `FunctionTool`/`ToolNode` instances after retrieval. + ### Similarity Search Find similar documents without LLM generation: @@ -398,9 +409,10 @@ result = query_knowledge("How does RAG work?") print(f"Answer: {result['answer']}\n") print("Sources:") -for source in result['sources']: - print(f" - {source['metadata']['title']} (score: {source['score']:.2f})") - print(f" {source['content'][:100]}...") +for source in result['source_nodes']: + title = source['metadata'].get('title', 'unknown') + print(f" - {title} (score: {source['score']:.2f})") + print(f" {source['text'][:100]}...") ``` ## HTTP API Usage @@ -413,10 +425,23 @@ curl -X POST http://localhost:8000/api/v1/knowledge/query \ -d '{ "question": "What is the deployment architecture?", "mode": "hybrid", - "top_k": 5 + "use_tools": false, + "top_k": 5, + "graph_depth": 2 }' ``` +### Pipeline Step Mapping + +| Mode / Flags | Graph Retrieval | Vector Retrieval | Tool Hooks | +| --- | --- | --- | --- | +| `hybrid` (default) | ✅ | ✅ | Optional (`use_tools`) | +| `graph_only` | ✅ | ❌ | Optional (`use_tools`) | +| `vector_only` | ❌ | ✅ | Optional (`use_tools`) | +| Custom (`use_graph` / `use_vector`) | As configured | As configured | Optional (`use_tools`) | + +When `use_tools` is set to `true`, the service invokes registered `FunctionTool`/`ToolNode` instances after retrieval. + ### Similarity Search ```bash @@ -434,14 +459,15 @@ curl -X POST http://localhost:8000/api/v1/knowledge/search \ import httpx import asyncio -async def query_knowledge(question: str, mode: str = "hybrid"): +async def query_knowledge(question: str, mode: str = "hybrid", top_k: int = 5, use_tools: bool = False): async with httpx.AsyncClient() as client: response = await client.post( "http://localhost:8000/api/v1/knowledge/query", json={ "question": question, "mode": mode, - "top_k": 5 + "use_tools": use_tools, + "top_k": top_k }, timeout=30.0 ) @@ -452,6 +478,7 @@ result = asyncio.run(query_knowledge( "How do I configure the system?" )) print(result['answer']) +print(result['pipeline_steps']) ``` ## Query Performance diff --git a/src/codebase_rag/api/routes.py b/src/codebase_rag/api/routes.py index 2bdc710..a6d99d8 100644 --- a/src/codebase_rag/api/routes.py +++ b/src/codebase_rag/api/routes.py @@ -46,6 +46,12 @@ class DirectoryProcessRequest(BaseModel): class QueryRequest(BaseModel): question: str mode: str = "hybrid" # hybrid, graph_only, vector_only + use_graph: Optional[bool] = None + use_vector: Optional[bool] = None + use_tools: bool = False + top_k: Optional[int] = None + graph_depth: Optional[int] = None + tool_kwargs: Optional[Dict[str, Any]] = None class SearchRequest(BaseModel): query: str @@ -181,7 +187,13 @@ async def query_knowledge(query_request: QueryRequest): try: result = await knowledge_service.query( question=query_request.question, - mode=query_request.mode + mode=query_request.mode, + use_graph=query_request.use_graph, + use_vector=query_request.use_vector, + use_tools=query_request.use_tools, + top_k=query_request.top_k, + graph_depth=query_request.graph_depth, + tool_kwargs=query_request.tool_kwargs, ) if result.get("success"): diff --git a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py index 31184f6..2da60a2 100644 --- a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py +++ b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py @@ -4,6 +4,7 @@ supports multiple LLM and embedding model providers """ +from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Union from pathlib import Path import asyncio @@ -11,12 +12,17 @@ import time from llama_index.core import ( - KnowledgeGraphIndex, - Document, + KnowledgeGraphIndex, + Document, Settings, StorageContext, - SimpleDirectoryReader + SimpleDirectoryReader, + VectorStoreIndex, ) +from llama_index.core.indices.knowledge_graph import KnowledgeGraphRAGRetriever +from llama_index.core.retrievers import VectorIndexRetriever +from llama_index.core.response_synthesizers import get_response_synthesizer +from llama_index.core.schema import QueryBundle, NodeWithScore # LLM Providers from llama_index.llms.ollama import Ollama @@ -33,37 +39,228 @@ # Graph Store from llama_index.graph_stores.neo4j import Neo4jGraphStore -# Core components -from llama_index.core.node_parser import SimpleNodeParser +from llama_index.core.tools import FunctionTool + +try: # Optional dependency for advanced workflow integration + from llama_index.core.workflow.tool_node import ToolNode +except Exception: # pragma: no cover - optional runtime dependency + ToolNode = None # type: ignore from codebase_rag.config import settings + +@dataclass +class PipelineConfig: + """Configuration for running a retrieval pipeline.""" + + run_graph: bool = True + run_vector: bool = True + run_tools: bool = False + top_k: int = 5 + graph_depth: int = 2 + tool_kwargs: Dict[str, Any] = field(default_factory=dict) + + +class Neo4jRAGPipeline: + """Lightweight query pipeline that orchestrates graph/vector retrieval and synthesis.""" + + def __init__( + self, + storage_context: StorageContext, + response_synthesizer, + llm, + *, + default_top_k: int = 5, + default_graph_depth: int = 2, + max_knowledge_sequence: int = 30, + verbose: bool = False, + vector_index: Optional[VectorStoreIndex] = None, + function_tools: Optional[List[FunctionTool]] = None, + tool_node: Optional["ToolNode"] = None, + ) -> None: + self.storage_context = storage_context + self.response_synthesizer = response_synthesizer + self.llm = llm + self.default_top_k = default_top_k + self.default_graph_depth = default_graph_depth + self.max_knowledge_sequence = max_knowledge_sequence + self.verbose = verbose + self.vector_index = vector_index + self.function_tools = function_tools or [] + self.tool_node = tool_node + + def _merge_nodes( + self, + aggregated: Dict[str, NodeWithScore], + nodes: List[NodeWithScore], + ) -> None: + """Merge retrieved nodes by keeping the highest scoring entry per node id.""" + + for node in nodes: + node_id = node.node.node_id if node.node else node.node_id + if node_id not in aggregated or ( + aggregated[node_id].score or 0 + ) < (node.score or 0): + aggregated[node_id] = node + + @staticmethod + def _summarize_nodes(nodes: List[NodeWithScore]) -> List[Dict[str, Any]]: + """Return a lightweight representation of retrieved nodes for tracing.""" + + summaries: List[Dict[str, Any]] = [] + for node in nodes: + text = node.get_text() or "" + if len(text) > 200: + text = text[:200] + "..." + summaries.append( + { + "node_id": node.node.node_id if node.node else node.node_id, + "score": node.score, + "metadata": dict(getattr(node.node, "metadata", {})), + "text": text, + } + ) + return summaries + + def run(self, question: str, config: PipelineConfig) -> Dict[str, Any]: + """Execute the pipeline synchronously.""" + + query_bundle = QueryBundle(query_str=question) + aggregated_nodes: Dict[str, NodeWithScore] = {} + pipeline_steps: List[Dict[str, Any]] = [] + + # Graph retrieval + if config.run_graph: + graph_retriever = KnowledgeGraphRAGRetriever( + storage_context=self.storage_context, + llm=self.llm, + graph_traversal_depth=config.graph_depth or self.default_graph_depth, + max_knowledge_sequence=self.max_knowledge_sequence, + verbose=self.verbose, + ) + graph_nodes = graph_retriever.retrieve(query_bundle) + self._merge_nodes(aggregated_nodes, graph_nodes) + pipeline_steps.append( + { + "step": "graph_retrieval", + "node_count": len(graph_nodes), + "config": { + "graph_traversal_depth": config.graph_depth or self.default_graph_depth, + "max_knowledge_sequence": self.max_knowledge_sequence, + }, + "nodes": self._summarize_nodes(graph_nodes), + } + ) + + # Vector retrieval + if config.run_vector and self.vector_index is not None: + vector_retriever = VectorIndexRetriever( + self.vector_index, + similarity_top_k=config.top_k or self.default_top_k, + ) + vector_nodes = vector_retriever.retrieve(query_bundle) + self._merge_nodes(aggregated_nodes, vector_nodes) + pipeline_steps.append( + { + "step": "vector_retrieval", + "node_count": len(vector_nodes), + "config": {"top_k": config.top_k or self.default_top_k}, + "nodes": self._summarize_nodes(vector_nodes), + } + ) + + aggregated_list = list(aggregated_nodes.values()) + response = self.response_synthesizer.synthesize(query_bundle, aggregated_list) + source_nodes = getattr(response, "source_nodes", aggregated_list) + + tool_outputs: List[Dict[str, Any]] = [] + if config.run_tools: + if self.tool_node is not None: + try: + payload = {"input": question, **(config.tool_kwargs or {})} + if hasattr(self.tool_node, "invoke"): + result = self.tool_node.invoke(payload) + elif callable(self.tool_node): + result = self.tool_node(payload) + else: + raise RuntimeError("Unsupported ToolNode interface") + tool_outputs.append( + { + "tool": getattr(result, "tool_name", "tool_node"), + "output": getattr(result, "raw_output", str(result)), + "is_error": getattr(result, "is_error", False), + } + ) + except Exception as exc: # pragma: no cover - defensive logging + logger.warning(f"Tool node execution failed: {exc}") + tool_outputs.append( + { + "tool": "tool_node", + "error": str(exc), + "is_error": True, + } + ) + elif self.function_tools: + for tool in self.function_tools: + try: + result = tool(question=question, **(config.tool_kwargs or {})) + tool_outputs.append( + { + "tool": result.tool_name, + "output": result.raw_output, + "is_error": result.is_error, + } + ) + except Exception as exc: # pragma: no cover - defensive logging + logger.warning(f"Function tool execution failed: {exc}") + tool_outputs.append( + { + "tool": tool.metadata.name, + "error": str(exc), + "is_error": True, + } + ) + + return { + "response": response, + "source_nodes": source_nodes, + "retrieved_nodes": aggregated_list, + "steps": pipeline_steps, + "tool_outputs": tool_outputs, + } + + class Neo4jKnowledgeService: """knowledge graph service based on Neo4j's native vector index""" - + def __init__(self): self.graph_store = None + self.storage_context: Optional[StorageContext] = None self.knowledge_index = None - self.query_engine = None + self.vector_index: Optional[VectorStoreIndex] = None + self.response_synthesizer = None + self.query_pipeline: Optional[Neo4jRAGPipeline] = None + self.function_tools: List[FunctionTool] = [] + self.tool_node: Optional["ToolNode"] = None self._initialized = False - + # get timeout settings from config self.connection_timeout = settings.connection_timeout self.operation_timeout = settings.operation_timeout self.large_document_timeout = settings.large_document_timeout - + logger.info("Neo4j Knowledge Service created") - + def _create_llm(self): """create LLM instance based on config""" provider = settings.llm_provider.lower() - + if provider == "ollama": return Ollama( model=settings.ollama_model, base_url=settings.ollama_base_url, temperature=settings.temperature, - request_timeout=self.operation_timeout + request_timeout=self.operation_timeout, ) elif provider == "openai": if not settings.openai_api_key: @@ -74,7 +271,7 @@ def _create_llm(self): api_base=settings.openai_base_url, temperature=settings.temperature, max_tokens=settings.max_tokens, - timeout=self.operation_timeout + timeout=self.operation_timeout, ) elif provider == "gemini": if not settings.google_api_key: @@ -83,7 +280,7 @@ def _create_llm(self): model=settings.gemini_model, api_key=settings.google_api_key, temperature=settings.temperature, - max_tokens=settings.max_tokens + max_tokens=settings.max_tokens, ) elif provider == "openrouter": if not settings.openrouter_api_key: @@ -93,36 +290,40 @@ def _create_llm(self): api_key=settings.openrouter_api_key, temperature=settings.temperature, max_tokens=settings.openrouter_max_tokens, - timeout=self.operation_timeout + timeout=self.operation_timeout, ) else: raise ValueError(f"Unsupported LLM provider: {provider}") - + def _create_embedding_model(self): """create embedding model instance based on config""" provider = settings.embedding_provider.lower() - + if provider == "ollama": return OllamaEmbedding( model_name=settings.ollama_embedding_model, base_url=settings.ollama_base_url, - request_timeout=self.operation_timeout + request_timeout=self.operation_timeout, ) elif provider == "openai": if not settings.openai_api_key: - raise ValueError("OpenAI API key is required for OpenAI embedding provider") + raise ValueError( + "OpenAI API key is required for OpenAI embedding provider" + ) return OpenAIEmbedding( model=settings.openai_embedding_model, api_key=settings.openai_api_key, api_base=settings.openai_base_url, - timeout=self.operation_timeout + timeout=self.operation_timeout, ) elif provider == "gemini": if not settings.google_api_key: - raise ValueError("Google API key is required for Gemini embedding provider") + raise ValueError( + "Google API key is required for Gemini embedding provider" + ) return GeminiEmbedding( model_name=settings.gemini_embedding_model, - api_key=settings.google_api_key + api_key=settings.google_api_key, ) elif provider == "huggingface": return HuggingFaceEmbedding( @@ -130,95 +331,311 @@ def _create_embedding_model(self): ) elif provider == "openrouter": if not settings.openrouter_api_key: - raise ValueError("OpenRouter API key is required for OpenRouter embedding provider") + raise ValueError( + "OpenRouter API key is required for OpenRouter embedding provider" + ) return OpenAIEmbedding( model=settings.openrouter_embedding_model, api_key=settings.openrouter_api_key, api_base=settings.openrouter_base_url, - timeout=self.operation_timeout + timeout=self.operation_timeout, ) else: raise ValueError(f"Unsupported embedding provider: {provider}") - + + def _register_tools(self) -> None: + """Create FunctionTool/ToolNode hooks for workflow integration.""" + + self.function_tools = [] + try: + tool = FunctionTool.from_defaults( + fn=self._tool_query, + name="neo4j_knowledge_query", + description="Run a Neo4j knowledge service query via the retrieval pipeline.", + ) + self.function_tools.append(tool) + except Exception as exc: # pragma: no cover - tool registration is best-effort + logger.warning(f"Failed to register FunctionTool: {exc}") + + if ToolNode is not None and self.function_tools: + try: + self.tool_node = ToolNode(self.function_tools) + except Exception as exc: # pragma: no cover - optional dependency + logger.warning(f"Failed to initialize ToolNode: {exc}") + self.tool_node = None + + if self.query_pipeline is not None: + self.query_pipeline.function_tools = self.function_tools + self.query_pipeline.tool_node = self.tool_node + async def initialize(self) -> bool: """initialize service""" try: - logger.info(f"Initializing with LLM provider: {settings.llm_provider}, Embedding provider: {settings.embedding_provider}") - + logger.info( + f"Initializing with LLM provider: {settings.llm_provider}, Embedding provider: {settings.embedding_provider}" + ) + # set LlamaIndex global config Settings.llm = self._create_llm() Settings.embed_model = self._create_embedding_model() - + Settings.chunk_size = settings.chunk_size Settings.chunk_overlap = settings.chunk_overlap - - logger.info(f"LLM: {settings.llm_provider} - {getattr(settings, f'{settings.llm_provider}_model')}") - logger.info(f"Embedding: {settings.embedding_provider} - {getattr(settings, f'{settings.embedding_provider}_embedding_model')}") - + + logger.info( + f"LLM: {settings.llm_provider} - {getattr(settings, f'{settings.llm_provider}_model')}" + ) + logger.info( + f"Embedding: {settings.embedding_provider} - {getattr(settings, f'{settings.embedding_provider}_embedding_model')}" + ) + # initialize Neo4j graph store, add timeout config self.graph_store = Neo4jGraphStore( username=settings.neo4j_username, password=settings.neo4j_password, url=settings.neo4j_uri, database=settings.neo4j_database, - timeout=self.connection_timeout + timeout=self.connection_timeout, ) - + # create storage context - storage_context = StorageContext.from_defaults( + self.storage_context = StorageContext.from_defaults( graph_store=self.graph_store ) - + # try to load existing index, if not exists, create new one try: self.knowledge_index = await asyncio.wait_for( asyncio.to_thread( KnowledgeGraphIndex.from_existing, - storage_context=storage_context + storage_context=self.storage_context, ), - timeout=self.connection_timeout + timeout=self.connection_timeout, ) logger.info("Loaded existing knowledge graph index") except asyncio.TimeoutError: - logger.warning("Loading existing index timed out, creating new index") + logger.warning( + "Loading existing index timed out, creating new index" + ) self.knowledge_index = KnowledgeGraphIndex( nodes=[], - storage_context=storage_context, - show_progress=True + storage_context=self.storage_context, + show_progress=True, ) logger.info("Created new knowledge graph index") except Exception: # create empty knowledge graph index self.knowledge_index = KnowledgeGraphIndex( nodes=[], - storage_context=storage_context, - show_progress=True + storage_context=self.storage_context, + show_progress=True, ) logger.info("Created new knowledge graph index") - - # 创建查询引擎 - self.query_engine = self.knowledge_index.as_query_engine( - include_text=True, + + # build vector index and response synthesizer + self.vector_index = VectorStoreIndex.from_vector_store( + self.storage_context.vector_store, + embed_model=Settings.embed_model, + ) + self.response_synthesizer = get_response_synthesizer( response_mode="tree_summarize", - embedding_mode="hybrid" + llm=Settings.llm, ) - + + # Construct the retrieval pipeline + self.query_pipeline = Neo4jRAGPipeline( + storage_context=self.storage_context, + response_synthesizer=self.response_synthesizer, + llm=Settings.llm, + default_top_k=settings.top_k, + default_graph_depth=2, + max_knowledge_sequence=30, + verbose=settings.debug, + vector_index=self.vector_index, + ) + self._register_tools() + self._initialized = True logger.success("Neo4j Knowledge Service initialized successfully") return True - + except Exception as e: logger.error(f"Failed to initialize Neo4j Knowledge Service: {e}") return False - - async def add_document(self, - content: str, - title: str = None, - metadata: Dict[str, Any] = None) -> Dict[str, Any]: + + def _resolve_pipeline_config( + self, + mode: str, + *, + use_graph: Optional[bool] = None, + use_vector: Optional[bool] = None, + use_tools: bool = False, + top_k: Optional[int] = None, + graph_depth: Optional[int] = None, + tool_kwargs: Optional[Dict[str, Any]] = None, + ) -> PipelineConfig: + """Translate user configuration into a pipeline configuration.""" + + mode = (mode or "hybrid").lower() + run_graph = use_graph if use_graph is not None else mode in ( + "hybrid", + "graph_only", + "graph_first", + ) + run_vector = use_vector if use_vector is not None else mode in ( + "hybrid", + "vector_only", + "vector_first", + ) + + if not run_graph and not run_vector: + raise ValueError( + "At least one of graph or vector retrieval must be enabled" + ) + + config = PipelineConfig() + config.run_graph = run_graph + config.run_vector = run_vector + config.run_tools = use_tools + config.top_k = top_k or settings.top_k + config.graph_depth = graph_depth or 2 + config.tool_kwargs = tool_kwargs or {} + return config + + def _format_source_nodes( + self, nodes: List[NodeWithScore] + ) -> List[Dict[str, Any]]: + formatted: List[Dict[str, Any]] = [] + for node in nodes: + text = node.get_text() or "" + if len(text) > 200: + text = text[:200] + "..." + formatted.append( + { + "node_id": node.node.node_id if node.node else node.node_id, + "text": text, + "metadata": dict(getattr(node.node, "metadata", {})), + "score": node.score, + } + ) + return formatted + + def _tool_query( + self, + query: str, + mode: str = "hybrid", + top_k: Optional[int] = None, + graph_depth: Optional[int] = None, + ) -> Dict[str, Any]: + """Synchronous query helper exposed to FunctionTool.""" + + if self.query_pipeline is None: + raise RuntimeError("Query pipeline is not initialized") + + config = self._resolve_pipeline_config( + mode, + top_k=top_k, + graph_depth=graph_depth, + use_tools=False, + ) + result = self.query_pipeline.run(query, config) + response = result["response"] + source_nodes = self._format_source_nodes(result["source_nodes"]) + return { + "answer": str(response), + "sources": source_nodes, + "pipeline_steps": result["steps"], + } + + async def query( + self, + question: str, + mode: str = "hybrid", + *, + use_graph: Optional[bool] = None, + use_vector: Optional[bool] = None, + use_tools: bool = False, + top_k: Optional[int] = None, + graph_depth: Optional[int] = None, + tool_kwargs: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """query knowledge graph""" + if not self._initialized: + raise Exception("Service not initialized") + if self.query_pipeline is None: + raise Exception("Query pipeline is not available") + + try: + config = self._resolve_pipeline_config( + mode, + use_graph=use_graph, + use_vector=use_vector, + use_tools=use_tools, + top_k=top_k, + graph_depth=graph_depth, + tool_kwargs=tool_kwargs, + ) + except ValueError as exc: + return {"success": False, "error": str(exc)} + + try: + pipeline_result = await asyncio.wait_for( + asyncio.to_thread( + self.query_pipeline.run, + question, + config, + ), + timeout=self.operation_timeout, + ) + except asyncio.TimeoutError: + error_msg = f"Query timed out after {self.operation_timeout}s" + logger.error(error_msg) + return { + "success": False, + "error": error_msg, + "timeout": self.operation_timeout, + } + except Exception as e: + logger.error(f"Failed to query: {e}") + return { + "success": False, + "error": str(e), + } + + response = pipeline_result["response"] + source_nodes = self._format_source_nodes(pipeline_result["source_nodes"]) + + logger.info(f"Successfully answered query: {question[:50]}...") + + return { + "success": True, + "answer": str(response), + "source_nodes": source_nodes, + "retrieved_nodes": self._format_source_nodes( + pipeline_result["retrieved_nodes"] + ), + "pipeline_steps": pipeline_result["steps"], + "tool_outputs": pipeline_result["tool_outputs"], + "query_mode": mode, + "config": { + "graph": config.run_graph, + "vector": config.run_vector, + "tools": config.run_tools, + "top_k": config.top_k, + "graph_depth": config.graph_depth, + }, + } + + async def add_document( + self, + content: str, + title: str = None, + metadata: Dict[str, Any] = None, + ) -> Dict[str, Any]: """add document to knowledge graph""" if not self._initialized: raise Exception("Service not initialized") - + try: # create document doc = Document( @@ -227,79 +644,91 @@ async def add_document(self, "title": title or "Untitled", "source": "manual_input", "timestamp": time.time(), - **(metadata or {}) - } + **(metadata or {}), + }, ) - + # select timeout based on document size content_size = len(content) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - - logger.info(f"Adding document '{title}' (size: {content_size} chars, timeout: {timeout}s)") - + timeout = ( + self.operation_timeout + if content_size < 10000 + else self.large_document_timeout + ) + + logger.info( + f"Adding document '{title}' (size: {content_size} chars, timeout: {timeout}s)" + ) + # use async timeout control for insert operation await asyncio.wait_for( asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout + timeout=timeout, ) - + logger.info(f"Successfully added document: {title}") - + return { "success": True, "message": f"Document '{title}' added to knowledge graph", "document_id": doc.doc_id, - "content_size": content_size + "content_size": content_size, } - + except asyncio.TimeoutError: error_msg = f"Document insertion timed out after {timeout}s" logger.error(error_msg) return { "success": False, "error": error_msg, - "timeout": timeout + "timeout": timeout, } except Exception as e: logger.error(f"Failed to add document: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - + async def add_file(self, file_path: str) -> Dict[str, Any]: """add file to knowledge graph""" if not self._initialized: raise Exception("Service not initialized") - + try: # read file documents = await asyncio.to_thread( lambda: SimpleDirectoryReader(input_files=[file_path]).load_data() ) - + if not documents: return { "success": False, - "error": "No documents loaded from file" + "error": "No documents loaded from file", } - + # batch insert, handle timeout for each document success_count = 0 errors = [] - + for i, doc in enumerate(documents): try: content_size = len(doc.text) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - + timeout = ( + self.operation_timeout + if content_size < 10000 + else self.large_document_timeout + ) + await asyncio.wait_for( asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout + timeout=timeout, ) success_count += 1 - logger.debug(f"Added document {i+1}/{len(documents)} from {file_path}") - + logger.debug( + f"Added document {i+1}/{len(documents)} from {file_path}" + ) + except asyncio.TimeoutError: error_msg = f"Document {i+1} timed out" errors.append(error_msg) @@ -308,375 +737,291 @@ async def add_file(self, file_path: str) -> Dict[str, Any]: error_msg = f"Document {i+1} failed: {str(e)}" errors.append(error_msg) logger.warning(error_msg) - - logger.info(f"Added {success_count}/{len(documents)} documents from {file_path}") - + + logger.info( + f"Added {success_count}/{len(documents)} documents from {file_path}" + ) + return { "success": success_count > 0, "message": f"Added {success_count}/{len(documents)} documents from {file_path}", "documents_count": len(documents), "success_count": success_count, - "errors": errors + "errors": errors, } - + except Exception as e: logger.error(f"Failed to add file {file_path}: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - - async def add_directory(self, - directory_path: str, - recursive: bool = True, - file_extensions: List[str] = None) -> Dict[str, Any]: + + async def add_directory( + self, + directory_path: str, + recursive: bool = True, + file_extensions: List[str] = None, + ) -> Dict[str, Any]: """batch add files in directory""" if not self._initialized: raise Exception("Service not initialized") - + try: # set file extension filter if file_extensions is None: - file_extensions = [".txt", ".md", ".py", ".js", ".ts", ".sql", ".json", ".yaml", ".yml"] - + file_extensions = [ + ".txt", + ".md", + ".py", + ".js", + ".ts", + ".sql", + ".json", + ".yaml", + ".yml", + ] + # read directory reader = SimpleDirectoryReader( input_dir=directory_path, recursive=recursive, - file_extractor={ext: None for ext in file_extensions} + file_extractor={ext: None for ext in file_extensions}, ) - + documents = await asyncio.to_thread(reader.load_data) - + if not documents: return { "success": False, - "error": "No documents found in directory" + "error": "No documents found in directory", } - + # batch insert, handle timeout for each document success_count = 0 errors = [] - - logger.info(f"Processing {len(documents)} documents from {directory_path}") - + + logger.info( + f"Processing {len(documents)} documents from {directory_path}" + ) + for i, doc in enumerate(documents): try: content_size = len(doc.text) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - + timeout = ( + self.operation_timeout + if content_size < 10000 + else self.large_document_timeout + ) + await asyncio.wait_for( asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout + timeout=timeout, ) success_count += 1 - + if i % 10 == 0: # record progress every 10 documents - logger.info(f"Progress: {i+1}/{len(documents)} documents processed") - + logger.info( + f"Progress: {i+1}/{len(documents)} documents processed" + ) + except asyncio.TimeoutError: - error_msg = f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) timed out" + error_msg = ( + f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) timed out" + ) errors.append(error_msg) logger.warning(error_msg) except Exception as e: - error_msg = f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) failed: {str(e)}" + error_msg = ( + f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) failed: {str(e)}" + ) errors.append(error_msg) logger.warning(error_msg) - - logger.info(f"Successfully added {success_count}/{len(documents)} documents from {directory_path}") - + + logger.info( + f"Successfully added {success_count}/{len(documents)} documents from {directory_path}" + ) + return { "success": success_count > 0, "message": f"Added {success_count}/{len(documents)} documents from {directory_path}", "documents_count": len(documents), "success_count": success_count, - "errors": errors + "errors": errors, } - + except Exception as e: logger.error(f"Failed to add directory {directory_path}: {e}") return { "success": False, - "error": str(e) - } - - async def query(self, - question: str, - mode: str = "hybrid") -> Dict[str, Any]: - """query knowledge graph""" - if not self._initialized: - raise Exception("Service not initialized") - - try: - # create different query engines based on mode - if mode == "hybrid": - # hybrid mode: graph traversal + vector search - query_engine = self.knowledge_index.as_query_engine( - include_text=True, - response_mode="tree_summarize", - embedding_mode="hybrid" - ) - elif mode == "graph_only": - # graph only mode - query_engine = self.knowledge_index.as_query_engine( - include_text=False, - response_mode="tree_summarize" - ) - elif mode == "vector_only": - # vector only mode - query_engine = self.knowledge_index.as_query_engine( - include_text=True, - response_mode="compact", - embedding_mode="embedding" - ) - else: - query_engine = self.query_engine - - # execute query, add timeout control - response = await asyncio.wait_for( - asyncio.to_thread(query_engine.query, question), - timeout=self.operation_timeout - ) - - # extract source node information - source_nodes = [] - if hasattr(response, 'source_nodes'): - for node in response.source_nodes: - source_nodes.append({ - "node_id": node.node_id, - "text": node.text[:200] + "..." if len(node.text) > 200 else node.text, - "metadata": node.metadata, - "score": getattr(node, 'score', None) - }) - - logger.info(f"Successfully answered query: {question[:50]}...") - - return { - "success": True, - "answer": str(response), - "source_nodes": source_nodes, - "query_mode": mode - } - - except asyncio.TimeoutError: - error_msg = f"Query timed out after {self.operation_timeout}s" - logger.error(error_msg) - return { - "success": False, - "error": error_msg, - "timeout": self.operation_timeout - } - except Exception as e: - logger.error(f"Failed to query: {e}") - return { - "success": False, - "error": str(e) + "error": str(e), } - + async def get_graph_schema(self) -> Dict[str, Any]: """get graph schema information""" if not self._initialized: raise Exception("Service not initialized") - + try: # get graph statistics, add timeout control schema_info = await asyncio.wait_for( asyncio.to_thread(self.graph_store.get_schema), - timeout=self.connection_timeout + timeout=self.connection_timeout, ) - + return { "success": True, - "schema": schema_info + "schema": schema_info, } - + except asyncio.TimeoutError: - error_msg = f"Schema retrieval timed out after {self.connection_timeout}s" + error_msg = ( + f"Schema retrieval timed out after {self.connection_timeout}s" + ) logger.error(error_msg) return { "success": False, - "error": error_msg + "error": error_msg, } except Exception as e: logger.error(f"Failed to get graph schema: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - - async def search_similar_nodes(self, - query: str, - top_k: int = 10) -> Dict[str, Any]: + + async def search_similar_nodes( + self, + query: str, + top_k: int = 10, + ) -> Dict[str, Any]: """search nodes by vector similarity""" if not self._initialized: raise Exception("Service not initialized") - + try: # use retriever for vector search, add timeout control retriever = self.knowledge_index.as_retriever( similarity_top_k=top_k, - include_text=True + include_text=True, ) - + nodes = await asyncio.wait_for( asyncio.to_thread(retriever.retrieve, query), - timeout=self.operation_timeout + timeout=self.operation_timeout, ) - - # format results + results = [] for node in nodes: - results.append({ - "node_id": node.node_id, - "text": node.text, - "metadata": node.metadata, - "score": getattr(node, 'score', None) - }) - + text = node.get_text() if hasattr(node, "get_text") else node.text + if text and len(text) > 200: + text = text[:200] + "..." + results.append( + { + "node_id": node.node.node_id if node.node else node.node_id, + "text": text, + "metadata": dict(getattr(node.node, "metadata", {})), + "score": node.score, + } + ) + return { "success": True, "results": results, - "total_count": len(results) + "query": query, } - + except asyncio.TimeoutError: - error_msg = f"Similar nodes search timed out after {self.operation_timeout}s" + error_msg = f"Search timed out after {self.operation_timeout}s" logger.error(error_msg) return { "success": False, "error": error_msg, - "timeout": self.operation_timeout + "timeout": self.operation_timeout, } except Exception as e: logger.error(f"Failed to search similar nodes: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - - async def get_statistics(self) -> Dict[str, Any]: - """get knowledge graph statistics""" + + async def execute_cypher( + self, + query: str, + parameters: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """execute cypher query""" if not self._initialized: raise Exception("Service not initialized") - + try: - # try to get basic statistics, add timeout control - try: - # if graph store supports statistics query - stats = await asyncio.wait_for( - asyncio.to_thread(lambda: { - "index_type": "KnowledgeGraphIndex with Neo4j vector store", - "graph_store_type": type(self.graph_store).__name__, - "initialized": self._initialized - }), - timeout=self.connection_timeout - ) - - return { - "success": True, - "statistics": stats, - "message": "Knowledge graph is active" - } - - except asyncio.TimeoutError: - return { - "success": False, - "error": f"Statistics retrieval timed out after {self.connection_timeout}s" - } - + result = await asyncio.wait_for( + asyncio.to_thread( + self.graph_store.query, + query, + parameters or {}, + ), + timeout=self.operation_timeout, + ) + + return { + "success": True, + "result": result, + "query": query, + } + + except asyncio.TimeoutError: + error_msg = f"Cypher execution timed out after {self.operation_timeout}s" + logger.error(error_msg) + return { + "success": False, + "error": error_msg, + "timeout": self.operation_timeout, + } except Exception as e: - logger.error(f"Failed to get statistics: {e}") + logger.error(f"Failed to execute cypher query: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - - async def clear_knowledge_base(self) -> Dict[str, Any]: - """clear knowledge base""" + + async def export_graph(self, output_path: Union[str, Path]) -> Dict[str, Any]: + """export knowledge graph to file""" if not self._initialized: raise Exception("Service not initialized") - + + output_path = Path(output_path) try: - # recreate empty index, add timeout control - storage_context = StorageContext.from_defaults( - graph_store=self.graph_store + export_result = await asyncio.wait_for( + asyncio.to_thread(self.graph_store.export_graph, str(output_path)), + timeout=self.operation_timeout, ) - - self.knowledge_index = await asyncio.wait_for( - asyncio.to_thread(lambda: KnowledgeGraphIndex( - nodes=[], - storage_context=storage_context, - show_progress=True - )), - timeout=self.connection_timeout - ) - - # recreate query engine - self.query_engine = self.knowledge_index.as_query_engine( - include_text=True, - response_mode="tree_summarize", - embedding_mode="hybrid" - ) - - logger.info("Knowledge base cleared successfully") - + return { "success": True, - "message": "Knowledge base cleared successfully" + "output_path": str(output_path), + "result": export_result, } - + except asyncio.TimeoutError: - error_msg = f"Clear operation timed out after {self.connection_timeout}s" + error_msg = f"Graph export timed out after {self.operation_timeout}s" logger.error(error_msg) return { "success": False, - "error": error_msg + "error": error_msg, + "timeout": self.operation_timeout, } except Exception as e: - logger.error(f"Failed to clear knowledge base: {e}") + logger.error(f"Failed to export graph: {e}") return { "success": False, - "error": str(e) + "error": str(e), } - - async def close(self): + + async def close(self) -> None: """close service""" - try: - if self.graph_store: - # if graph store has close method, call it - if hasattr(self.graph_store, 'close'): - await asyncio.wait_for( - asyncio.to_thread(self.graph_store.close), - timeout=self.connection_timeout - ) - elif hasattr(self.graph_store, '_driver') and self.graph_store._driver: - # close Neo4j driver connection - await asyncio.wait_for( - asyncio.to_thread(self.graph_store._driver.close), - timeout=self.connection_timeout - ) - - self._initialized = False - logger.info("Neo4j Knowledge Service closed") - - except asyncio.TimeoutError: - logger.warning(f"Service close timed out after {self.connection_timeout}s") - except Exception as e: - logger.error(f"Error closing service: {e}") - - def set_timeouts(self, connection_timeout: int = None, operation_timeout: int = None, large_document_timeout: int = None): - """dynamic set timeout parameters""" - if connection_timeout is not None: - self.connection_timeout = connection_timeout - logger.info(f"Connection timeout set to {connection_timeout}s") - - if operation_timeout is not None: - self.operation_timeout = operation_timeout - logger.info(f"Operation timeout set to {operation_timeout}s") - - if large_document_timeout is not None: - self.large_document_timeout = large_document_timeout - logger.info(f"Large document timeout set to {large_document_timeout}s") - -# global service instance -neo4j_knowledge_service = Neo4jKnowledgeService() + if self.graph_store: + await asyncio.to_thread(self.graph_store.close) + self._initialized = False + logger.info("Neo4j Knowledge Service closed") From 1efe13089fd030df9e6adac8ccc71e3113eab01d Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Thu, 6 Nov 2025 23:48:45 -0500 Subject: [PATCH 2/4] Restore statistics and clear APIs in Neo4j service --- .../knowledge/neo4j_knowledge_service.py | 169 ++++++++++++++++-- 1 file changed, 158 insertions(+), 11 deletions(-) diff --git a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py index 2da60a2..2b0fe14 100644 --- a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py +++ b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py @@ -368,6 +368,26 @@ def _register_tools(self) -> None: self.query_pipeline.function_tools = self.function_tools self.query_pipeline.tool_node = self.tool_node + def _build_pipeline(self) -> None: + """(Re)build the retrieval pipeline and refresh tool bindings.""" + + if self.storage_context is None: + raise RuntimeError("Storage context is not available") + if self.response_synthesizer is None: + raise RuntimeError("Response synthesizer is not available") + + self.query_pipeline = Neo4jRAGPipeline( + storage_context=self.storage_context, + response_synthesizer=self.response_synthesizer, + llm=Settings.llm, + default_top_k=settings.top_k, + default_graph_depth=2, + max_knowledge_sequence=30, + verbose=settings.debug, + vector_index=self.vector_index, + ) + self._register_tools() + async def initialize(self) -> bool: """initialize service""" try: @@ -443,17 +463,7 @@ async def initialize(self) -> bool: ) # Construct the retrieval pipeline - self.query_pipeline = Neo4jRAGPipeline( - storage_context=self.storage_context, - response_synthesizer=self.response_synthesizer, - llm=Settings.llm, - default_top_k=settings.top_k, - default_graph_depth=2, - max_knowledge_sequence=30, - verbose=settings.debug, - vector_index=self.vector_index, - ) - self._register_tools() + self._build_pipeline() self._initialized = True logger.success("Neo4j Knowledge Service initialized successfully") @@ -946,6 +956,143 @@ async def search_similar_nodes( "error": str(e), } + async def get_statistics(self) -> Dict[str, Any]: + """Return lightweight service statistics for legacy API compatibility.""" + + if not self._initialized: + raise Exception("Service not initialized") + + def _collect_statistics() -> Dict[str, Any]: + base_stats: Dict[str, Any] = { + "initialized": self._initialized, + "graph_store_type": type(self.graph_store).__name__ + if self.graph_store + else None, + "vector_index_type": type(self.vector_index).__name__ + if self.vector_index + else None, + "pipeline": { + "default_top_k": getattr(self.query_pipeline, "default_top_k", None), + "default_graph_depth": getattr( + self.query_pipeline, "default_graph_depth", None + ), + "supports_tools": bool(self.function_tools), + }, + } + + if self.graph_store is None: + return base_stats + + try: + node_result = self.graph_store.query( + "MATCH (n) RETURN count(n) AS node_count" + ) + base_stats["node_count"] = ( + node_result[0].get("node_count", 0) if node_result else 0 + ) + except Exception as exc: + base_stats["node_count_error"] = str(exc) + + try: + rel_result = self.graph_store.query( + "MATCH ()-[r]->() RETURN count(r) AS relationship_count" + ) + base_stats["relationship_count"] = ( + rel_result[0].get("relationship_count", 0) + if rel_result + else 0 + ) + except Exception as exc: + base_stats["relationship_count_error"] = str(exc) + + return base_stats + + try: + statistics = await asyncio.wait_for( + asyncio.to_thread(_collect_statistics), + timeout=self.operation_timeout, + ) + return {"success": True, "statistics": statistics} + except asyncio.TimeoutError: + error_msg = f"Statistics retrieval timed out after {self.operation_timeout}s" + logger.error(error_msg) + return { + "success": False, + "error": error_msg, + "timeout": self.operation_timeout, + } + except Exception as exc: + logger.error(f"Failed to collect statistics: {exc}") + return {"success": False, "error": str(exc)} + + async def clear_knowledge_base(self) -> Dict[str, Any]: + """Clear Neo4j data and rebuild service indices for legacy API compatibility.""" + + if not self._initialized: + raise Exception("Service not initialized") + + async def _clear_graph() -> None: + def _clear_sync() -> None: + if self.graph_store is None: + raise RuntimeError("Graph store is not available") + + # Remove all nodes/relationships + self.graph_store.query("MATCH (n) DETACH DELETE n") + + # Best-effort vector store reset (depends on backend capabilities) + vector_store = getattr(self.storage_context, "vector_store", None) + if vector_store is not None: + delete_method = getattr(vector_store, "delete", None) + if callable(delete_method): + try: + delete_method(delete_all=True) + except TypeError: + delete_method() + except Exception as exc: # pragma: no cover - defensive logging + logger.warning( + f"Vector store clear failed: {exc}" + ) + + await asyncio.to_thread(_clear_sync) + + try: + await asyncio.wait_for( + _clear_graph(), + timeout=self.operation_timeout, + ) + + # Recreate storage context and indexes to reflect cleared state + self.storage_context = StorageContext.from_defaults( + graph_store=self.graph_store + ) + self.knowledge_index = KnowledgeGraphIndex( + nodes=[], + storage_context=self.storage_context, + show_progress=True, + ) + self.vector_index = VectorStoreIndex.from_vector_store( + self.storage_context.vector_store, + embed_model=Settings.embed_model, + ) + self._build_pipeline() + + logger.info("Knowledge base cleared successfully") + return { + "success": True, + "message": "Knowledge base cleared successfully", + } + except asyncio.TimeoutError: + error_msg = f"Clear operation timed out after {self.operation_timeout}s" + logger.error(error_msg) + return { + "success": False, + "error": error_msg, + "timeout": self.operation_timeout, + } + except Exception as exc: + logger.error(f"Failed to clear knowledge base: {exc}") + return {"success": False, "error": str(exc)} + async def execute_cypher( self, query: str, From fc94ba21185b9bb8a32fffc4d2286ee691f002d0 Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Fri, 7 Nov 2025 13:14:32 +0800 Subject: [PATCH 3/4] Update docs/guide/knowledge/query.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- docs/guide/knowledge/query.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/guide/knowledge/query.md b/docs/guide/knowledge/query.md index 1a5cefb..07efcbf 100644 --- a/docs/guide/knowledge/query.md +++ b/docs/guide/knowledge/query.md @@ -411,7 +411,8 @@ print(f"Answer: {result['answer']}\n") print("Sources:") for source in result['source_nodes']: title = source['metadata'].get('title', 'unknown') - print(f" - {title} (score: {source['score']:.2f})") + score = source['score'] if source['score'] is not None else 0.0 + print(f" - {title} (score: {score:.2f})") print(f" {source['text'][:100]}...") ``` From 10296cda48e3f07f17ba8506f163b7ae92b920cb Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Fri, 7 Nov 2025 13:40:36 +0800 Subject: [PATCH 4/4] Update src/codebase_rag/services/knowledge/neo4j_knowledge_service.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/codebase_rag/services/knowledge/neo4j_knowledge_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py index f4a6b40..1f633c7 100644 --- a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py +++ b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py @@ -207,7 +207,7 @@ def run(self, question: str, config: PipelineConfig) -> Dict[str, Any]: elif self.function_tools: for tool in self.function_tools: try: - result = tool(question=question, **(config.tool_kwargs or {})) + result = tool(query=question, **(config.tool_kwargs or {})) tool_outputs.append( { "tool": result.tool_name,