Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docs/weaviate-grpc-timeout-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Weaviate gRPC Timeout Fix for Azure Deployment

## Problem
The application experiences intermittent gRPC timeout errors when connecting to Weaviate Cloud from Azure:
```
Query call with protocol GRPC search failed with message Deadline Exceeded
```

This error occurs approximately 1 out of 5 times on the Azure deployment but not locally.

## Root Cause
- Weaviate Python client v4 uses gRPC by default for better performance
- Network latency between Azure and Weaviate Cloud can cause gRPC timeouts
- The default timeout settings may be too aggressive for cross-cloud connections

## Solution Implemented

### 1. Extended Timeout Configuration
- Increased connection timeout to 30 seconds
- Increased query timeout to 60 seconds
- Increased insert timeout to 60 seconds

### 2. Retry Logic with Exponential Backoff
- Added automatic retry for connection initialization (3 attempts)
- Added automatic retry for search operations (3 attempts)
- Implements exponential backoff (2s, 4s, 8s delays)

### 3. Custom Headers for Azure
- Added `X-Azure-Source: true` header to identify Azure requests
- Added `X-Timeout-Seconds: 60` header as a timeout hint

### 4. REST-only Fallback Option
As a last resort, you can force the use of Weaviate v3 client which only uses REST protocol:

```bash
# Set this environment variable in Azure App Service
WEAVIATE_USE_V3_CLIENT=true
```

## Configuration Steps for Azure

### Option 1: Use Enhanced v4 Client (Recommended)
No additional configuration needed. The improved timeout and retry logic should handle most cases.

### Option 2: Force REST-only Protocol (If issues persist)
1. Go to Azure Portal > Your App Service > Configuration
2. Add new Application Setting:
- Name: `WEAVIATE_USE_V3_CLIENT`
- Value: `true`
3. Save and restart the app

## Monitoring
The application will log:
- Connection attempts and successes
- Retry attempts with detailed error messages
- Which client version (v3 or v4) is being used

Look for these log messages:
- `"Weaviate v4 client connected successfully (attempt X)"`
- `"Weaviate connection timeout (attempt X/3): ... Retrying in Xs..."`
- `"Using Weaviate v3 client (REST-only) as configured"`

## Performance Considerations
- v4 client with gRPC is faster but may have connectivity issues
- v3 client with REST is more reliable but slightly slower
- The retry logic adds resilience but may increase response time for failed attempts

## Next Steps if Issues Persist
1. Contact Weaviate support about gRPC connectivity from Azure regions
2. Consider using a Weaviate instance in the same cloud provider (Azure)
3. Implement a circuit breaker pattern for better fault tolerance
179 changes: 126 additions & 53 deletions src/vectordb/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

import logging
from typing import List, Dict, Any, Optional
import time
import os

import weaviate
from weaviate.classes.config import Configure, Property, VectorDistances, DataType
from weaviate.classes.query import Filter
from weaviate.classes.init import AdditionalConfig, Timeout

from .interface import VectorDatabase
from ..core.documents import VectorDocument, SearchResult
Expand Down Expand Up @@ -46,10 +49,58 @@ def __init__(self, collection_name: Optional[str] = None, embedding_dim: Optiona
raise ValueError("WEAVIATE_API_KEY and WEAVIATE_URL must be configured")

auth_credentials = weaviate.auth.AuthApiKey(api_key=weaviate_api_key)
self.client = weaviate.connect_to_weaviate_cloud(
cluster_url=weaviate_url, auth_credentials=auth_credentials, skip_init_checks=True
)
self.logger.info(f"Weaviate client connected: localhost:{self.config.vectordb_port or 8080}")

# Check if we should use v3 client (REST-only) as a workaround for gRPC issues
use_v3_client = os.getenv("WEAVIATE_USE_V3_CLIENT", "false").lower() == "true"

if use_v3_client:
self.logger.info("Using Weaviate v3 client (REST-only) as configured")
# Use v3 client which only uses REST protocol
self.client = weaviate.Client(
url=weaviate_url,
auth_client_secret=auth_credentials,
timeout_config=(30, 60), # (connect_timeout, read_timeout)
additional_headers={"X-Azure-Source": "true", "X-Timeout-Seconds": "60"},
)
self.logger.info("Weaviate v3 client connected successfully using REST protocol")
else:
# Configure additional settings with extended timeouts for Azure
additional_config = AdditionalConfig(
timeout=Timeout(
init=30, # 30 seconds for initialization
query=60, # 60 seconds for queries (increase from default)
insert=60, # 60 seconds for inserts
),
additional_headers={
"X-Azure-Source": "true", # Custom header to identify Azure source
"X-Timeout-Seconds": "60", # Request timeout hint
},
)

# Connect with retry logic
max_retries = 3
retry_delay = 2 # seconds

for attempt in range(max_retries):
try:
self.client = weaviate.connect_to_weaviate_cloud(
cluster_url=weaviate_url,
auth_credentials=auth_credentials,
additional_config=additional_config,
skip_init_checks=True,
)
self.logger.info(f"Weaviate v4 client connected successfully (attempt {attempt + 1})")
break
except Exception as e:
if attempt < max_retries - 1:
self.logger.warning(
f"Weaviate connection attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s..."
)
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
self.logger.error(f"Failed to connect to Weaviate after {max_retries} attempts")
raise ConnectionError(f"Failed to connect to Weaviate: {e}")

self._ensure_collection()

Expand Down Expand Up @@ -160,57 +211,79 @@ def add_documents(self, documents: List[VectorDocument], embeddings: List[List[f
def search(
self, query_embedding: List[float], k: int = 3, filter_conditions: Optional[Dict[str, Any]] = None
) -> List[SearchResult]:
"""Search with Weaviate vector similarity"""
try:
collection = self.client.collections.get(self.collection_name)

# Build filter if conditions provided
where_filter = None
if filter_conditions:
filters = []
for key, value in filter_conditions.items():
filters.append(Filter.by_property(key).equal(value))

if len(filters) == 1:
where_filter = filters[0]
"""Search with Weaviate vector similarity with retry logic"""
max_retries = 3
retry_delay = 1 # seconds
last_error = None

for attempt in range(max_retries):
try:
collection = self.client.collections.get(self.collection_name)

# Build filter if conditions provided
where_filter = None
if filter_conditions:
filters = []
for key, value in filter_conditions.items():
filters.append(Filter.by_property(key).equal(value))

if len(filters) == 1:
where_filter = filters[0]
else:
where_filter = Filter.all_of(filters)

if where_filter:
response = collection.query.near_vector(
near_vector=query_embedding,
limit=k,
distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity
return_metadata=["distance"],
).where(where_filter)
else:
where_filter = Filter.all_of(filters)

if where_filter:
response = collection.query.near_vector(
near_vector=query_embedding,
limit=k,
distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity
return_metadata=["distance"],
).where(where_filter)
else:
response = collection.query.near_vector(
near_vector=query_embedding,
limit=k,
distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity
return_metadata=["distance"],
)

results = []
for obj in response.objects:
properties = obj.properties
content = properties.pop("content", "")
original_doc_id = properties.pop("original_doc_id", str(obj.uuid))

# Remaining properties become metadata
metadata = properties

document = VectorDocument(id=original_doc_id, content=content, metadata=metadata)

# Convert distance to similarity score (1 - distance for cosine)
score = 1.0 - obj.metadata.distance if obj.metadata.distance else 1.0
results.append(SearchResult(document=document, score=score))

self.logger.debug(f"Search: {len(results)} results found")
return results
response = collection.query.near_vector(
near_vector=query_embedding,
limit=k,
distance=0.8, # Equivalent to score_threshold=0.2 in cosine similarity
return_metadata=["distance"],
)

results = []
for obj in response.objects:
properties = obj.properties
content = properties.pop("content", "")
original_doc_id = properties.pop("original_doc_id", str(obj.uuid))

# Remaining properties become metadata
metadata = properties

document = VectorDocument(id=original_doc_id, content=content, metadata=metadata)

# Convert distance to similarity score (1 - distance for cosine)
score = 1.0 - obj.metadata.distance if obj.metadata.distance else 1.0
results.append(SearchResult(document=document, score=score))

self.logger.debug(f"Search: {len(results)} results found (attempt {attempt + 1})")
return results

except Exception as e:
last_error = e
if "Deadline Exceeded" in str(e) or "timeout" in str(e).lower():
if attempt < max_retries - 1:
self.logger.warning(
f"Weaviate search timeout (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {retry_delay}s..."
)
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
continue
else:
self.logger.error(f"Weaviate search failed after {max_retries} attempts due to timeout")
else:
# For non-timeout errors, fail immediately
self.logger.error(f"Weaviate search failed: {e}")
break

except Exception as e:
raise RuntimeError(f"Weaviate search failed: {e}")
raise RuntimeError(f"Weaviate search failed: {last_error}")

def exists(self) -> bool:
"""Check collection existence"""
Expand Down
Loading