diff --git a/docs/weaviate-grpc-timeout-fix.md b/docs/weaviate-grpc-timeout-fix.md new file mode 100644 index 0000000..00e2c6e --- /dev/null +++ b/docs/weaviate-grpc-timeout-fix.md @@ -0,0 +1,71 @@ +# Weaviate gRPC Timeout Fix for Azure Deployment + +## Problem +The application experiences intermittent gRPC timeout errors when connecting to Weaviate Cloud from Azure: +``` +Query call with protocol GRPC search failed with message Deadline Exceeded +``` + +This error occurs approximately 1 out of 5 times on the Azure deployment but not locally. + +## Root Cause +- Weaviate Python client v4 uses gRPC by default for better performance +- Network latency between Azure and Weaviate Cloud can cause gRPC timeouts +- The default timeout settings may be too aggressive for cross-cloud connections + +## Solution Implemented + +### 1. Extended Timeout Configuration +- Increased connection timeout to 30 seconds +- Increased query timeout to 60 seconds +- Increased insert timeout to 60 seconds + +### 2. Retry Logic with Exponential Backoff +- Added automatic retry for connection initialization (3 attempts) +- Added automatic retry for search operations (3 attempts) +- Implements exponential backoff (2s, 4s, 8s delays) + +### 3. Custom Headers for Azure +- Added `X-Azure-Source: true` header to identify Azure requests +- Added `X-Timeout-Seconds: 60` header as a timeout hint + +### 4. REST-only Fallback Option +As a last resort, you can force the use of Weaviate v3 client which only uses REST protocol: + +```bash +# Set this environment variable in Azure App Service +WEAVIATE_USE_V3_CLIENT=true +``` + +## Configuration Steps for Azure + +### Option 1: Use Enhanced v4 Client (Recommended) +No additional configuration needed. The improved timeout and retry logic should handle most cases. + +### Option 2: Force REST-only Protocol (If issues persist) +1. Go to Azure Portal > Your App Service > Configuration +2. Add new Application Setting: + - Name: `WEAVIATE_USE_V3_CLIENT` + - Value: `true` +3. Save and restart the app + +## Monitoring +The application will log: +- Connection attempts and successes +- Retry attempts with detailed error messages +- Which client version (v3 or v4) is being used + +Look for these log messages: +- `"Weaviate v4 client connected successfully (attempt X)"` +- `"Weaviate connection timeout (attempt X/3): ... Retrying in Xs..."` +- `"Using Weaviate v3 client (REST-only) as configured"` + +## Performance Considerations +- v4 client with gRPC is faster but may have connectivity issues +- v3 client with REST is more reliable but slightly slower +- The retry logic adds resilience but may increase response time for failed attempts + +## Next Steps if Issues Persist +1. Contact Weaviate support about gRPC connectivity from Azure regions +2. Consider using a Weaviate instance in the same cloud provider (Azure) +3. Implement a circuit breaker pattern for better fault tolerance \ No newline at end of file diff --git a/src/vectordb/weaviate_client.py b/src/vectordb/weaviate_client.py index aca7927..189352f 100644 --- a/src/vectordb/weaviate_client.py +++ b/src/vectordb/weaviate_client.py @@ -4,10 +4,13 @@ import logging from typing import List, Dict, Any, Optional +import time +import os import weaviate from weaviate.classes.config import Configure, Property, VectorDistances, DataType from weaviate.classes.query import Filter +from weaviate.classes.init import AdditionalConfig, Timeout from .interface import VectorDatabase from ..core.documents import VectorDocument, SearchResult @@ -46,10 +49,58 @@ def __init__(self, collection_name: Optional[str] = None, embedding_dim: Optiona raise ValueError("WEAVIATE_API_KEY and WEAVIATE_URL must be configured") auth_credentials = weaviate.auth.AuthApiKey(api_key=weaviate_api_key) - self.client = weaviate.connect_to_weaviate_cloud( - cluster_url=weaviate_url, auth_credentials=auth_credentials, skip_init_checks=True - ) - self.logger.info(f"Weaviate client connected: localhost:{self.config.vectordb_port or 8080}") + + # Check if we should use v3 client (REST-only) as a workaround for gRPC issues + use_v3_client = os.getenv("WEAVIATE_USE_V3_CLIENT", "false").lower() == "true" + + if use_v3_client: + self.logger.info("Using Weaviate v3 client (REST-only) as configured") + # Use v3 client which only uses REST protocol + self.client = weaviate.Client( + url=weaviate_url, + auth_client_secret=auth_credentials, + timeout_config=(30, 60), # (connect_timeout, read_timeout) + additional_headers={"X-Azure-Source": "true", "X-Timeout-Seconds": "60"}, + ) + self.logger.info("Weaviate v3 client connected successfully using REST protocol") + else: + # Configure additional settings with extended timeouts for Azure + additional_config = AdditionalConfig( + timeout=Timeout( + init=30, # 30 seconds for initialization + query=60, # 60 seconds for queries (increase from default) + insert=60, # 60 seconds for inserts + ), + additional_headers={ + "X-Azure-Source": "true", # Custom header to identify Azure source + "X-Timeout-Seconds": "60", # Request timeout hint + }, + ) + + # Connect with retry logic + max_retries = 3 + retry_delay = 2 # seconds + + for attempt in range(max_retries): + try: + self.client = weaviate.connect_to_weaviate_cloud( + cluster_url=weaviate_url, + auth_credentials=auth_credentials, + additional_config=additional_config, + skip_init_checks=True, + ) + self.logger.info(f"Weaviate v4 client connected successfully (attempt {attempt + 1})") + break + except Exception as e: + if attempt < max_retries - 1: + self.logger.warning( + f"Weaviate connection attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s..." + ) + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + self.logger.error(f"Failed to connect to Weaviate after {max_retries} attempts") + raise ConnectionError(f"Failed to connect to Weaviate: {e}") self._ensure_collection() @@ -160,57 +211,79 @@ def add_documents(self, documents: List[VectorDocument], embeddings: List[List[f def search( self, query_embedding: List[float], k: int = 3, filter_conditions: Optional[Dict[str, Any]] = None ) -> List[SearchResult]: - """Search with Weaviate vector similarity""" - try: - collection = self.client.collections.get(self.collection_name) - - # Build filter if conditions provided - where_filter = None - if filter_conditions: - filters = [] - for key, value in filter_conditions.items(): - filters.append(Filter.by_property(key).equal(value)) - - if len(filters) == 1: - where_filter = filters[0] + """Search with Weaviate vector similarity with retry logic""" + max_retries = 3 + retry_delay = 1 # seconds + last_error = None + + for attempt in range(max_retries): + try: + collection = self.client.collections.get(self.collection_name) + + # Build filter if conditions provided + where_filter = None + if filter_conditions: + filters = [] + for key, value in filter_conditions.items(): + filters.append(Filter.by_property(key).equal(value)) + + if len(filters) == 1: + where_filter = filters[0] + else: + where_filter = Filter.all_of(filters) + + if where_filter: + response = collection.query.near_vector( + near_vector=query_embedding, + limit=k, + distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity + return_metadata=["distance"], + ).where(where_filter) else: - where_filter = Filter.all_of(filters) - - if where_filter: - response = collection.query.near_vector( - near_vector=query_embedding, - limit=k, - distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity - return_metadata=["distance"], - ).where(where_filter) - else: - response = collection.query.near_vector( - near_vector=query_embedding, - limit=k, - distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity - return_metadata=["distance"], - ) - - results = [] - for obj in response.objects: - properties = obj.properties - content = properties.pop("content", "") - original_doc_id = properties.pop("original_doc_id", str(obj.uuid)) - - # Remaining properties become metadata - metadata = properties - - document = VectorDocument(id=original_doc_id, content=content, metadata=metadata) - - # Convert distance to similarity score (1 - distance for cosine) - score = 1.0 - obj.metadata.distance if obj.metadata.distance else 1.0 - results.append(SearchResult(document=document, score=score)) - - self.logger.debug(f"Search: {len(results)} results found") - return results + response = collection.query.near_vector( + near_vector=query_embedding, + limit=k, + distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity + return_metadata=["distance"], + ) + + results = [] + for obj in response.objects: + properties = obj.properties + content = properties.pop("content", "") + original_doc_id = properties.pop("original_doc_id", str(obj.uuid)) + + # Remaining properties become metadata + metadata = properties + + document = VectorDocument(id=original_doc_id, content=content, metadata=metadata) + + # Convert distance to similarity score (1 - distance for cosine) + score = 1.0 - obj.metadata.distance if obj.metadata.distance else 1.0 + results.append(SearchResult(document=document, score=score)) + + self.logger.debug(f"Search: {len(results)} results found (attempt {attempt + 1})") + return results + + except Exception as e: + last_error = e + if "Deadline Exceeded" in str(e) or "timeout" in str(e).lower(): + if attempt < max_retries - 1: + self.logger.warning( + f"Weaviate search timeout (attempt {attempt + 1}/{max_retries}): {e}. " + f"Retrying in {retry_delay}s..." + ) + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + continue + else: + self.logger.error(f"Weaviate search failed after {max_retries} attempts due to timeout") + else: + # For non-timeout errors, fail immediately + self.logger.error(f"Weaviate search failed: {e}") + break - except Exception as e: - raise RuntimeError(f"Weaviate search failed: {e}") + raise RuntimeError(f"Weaviate search failed: {last_error}") def exists(self) -> bool: """Check collection existence"""