Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions ai/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
search_vectors as _search_vectors,
get_chunk_text as _get_chunk_text,
)
from .openai import get_embedding_for_text, call_coding_api
from .openai import call_coding_api, EmbeddingClient
from llama_index.core import Document
from utils.logger import get_logger
from utils import compute_file_hash, chunk_text, norm, cosine
Expand Down Expand Up @@ -59,15 +59,18 @@

logger = get_logger(__name__)

# Initialize EmbeddingClient for structured logging and retry logic
_embedding_client = EmbeddingClient()

def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, model: Optional[str] = None):

def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None):
"""
Wrapper to acquire semaphore inside executor task to avoid deadlock.
The semaphore is acquired in the worker thread, not the main thread.
"""
semaphore.acquire()
try:
return get_embedding_for_text(text, model)
return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index)
finally:
semaphore.release()

Expand Down Expand Up @@ -192,7 +195,7 @@ def _process_file_sync(
for idx, chunk_doc in batch:
# Submit task to executor; semaphore will be acquired inside the worker
embedding_start_time = time.time()
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, embedding_model)
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model)
embedding_futures.append((idx, chunk_doc, future, embedding_start_time))

# Wait for batch to complete and store results
Expand Down Expand Up @@ -434,7 +437,7 @@ def search_semantic(query: str, database_path: str, top_k: int = 5):
Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns
a list of {file_id, path, chunk_index, score}.
"""
q_emb = get_embedding_for_text(query)
q_emb = _embedding_client.embed_text(query, file_path="<query>", chunk_index=0)
if not q_emb:
return []

Expand Down
7 changes: 5 additions & 2 deletions ai/llama_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from typing import List
from llama_index.core import Document

from .openai import get_embedding_for_text
from .openai import EmbeddingClient
from utils.logger import get_logger

logger = get_logger(__name__)

# Create a module-level embedding client instance
_embedding_client = EmbeddingClient()


def llama_index_retrieve_documents(query: str, database_path: str, top_k: int = 5,
search_func=None, get_chunk_func=None) -> List[Document]:
Expand All @@ -28,7 +31,7 @@ def llama_index_retrieve_documents(query: str, database_path: str, top_k: int =
if search_func is None or get_chunk_func is None:
raise ValueError("search_func and get_chunk_func must be provided")

q_emb = get_embedding_for_text(query)
q_emb = _embedding_client.embed_text(query, file_path="<query>", chunk_index=0)
if not q_emb:
return []

Expand Down
208 changes: 192 additions & 16 deletions ai/openai.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import Optional
from typing import Optional, List, Dict, Any
import os
import time
import uuid
import json
import logging
import traceback
import threading
from openai import OpenAI
import requests

from utils.config import CFG

Expand All @@ -13,6 +18,9 @@
DEFAULT_EMBEDDING_MODEL = CFG.get("embedding_model")
DEFAULT_CODING_MODEL = CFG.get("coding_model")

# Embedding client logger
_embedding_logger = logging.getLogger("ai.analyzer.embedding")

# Rate limiting configuration
_RATE_LIMIT_CALLS = 100 # max calls per minute
_RATE_LIMIT_WINDOW = 60.0 # seconds
Expand Down Expand Up @@ -100,24 +108,192 @@ def _retry_with_backoff(func, *args, **kwargs):
time.sleep(delay)


def get_embedding_for_text(text: str, model: Optional[str] = None):
class EmbeddingError(Exception):
"""Custom exception for embedding failures"""
pass


class EmbeddingClient:
"""
Return embedding vector (list[float]) using the new OpenAI client.
Includes rate limiting, retry logic with exponential backoff, and circuit breaker.
model: optional model id; if not provided, uses DEFAULT_EMBEDDING_MODEL from CFG.
Embedding client with detailed logging, retry logic, and configurable timeouts.
Provides better debugging for embedding API failures.
"""
model_to_use = model or DEFAULT_EMBEDDING_MODEL
if not model_to_use:
raise RuntimeError("No embedding model configured. Set EMBEDDING_MODEL in .env or pass model argument.")
def __init__(self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
timeout: float = 30.0,
max_retries: int = 2,
backoff: float = 1.5):
self.api_url = api_url or CFG.get("api_url")
self.api_key = api_key or CFG.get("api_key")
self.model = model or DEFAULT_EMBEDDING_MODEL or "text-embedding-3-small"
self.timeout = timeout
self.max_retries = max_retries
self.backoff = backoff
self.session = requests.Session()
if self.api_key:
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
self.session.headers.update({"Content-Type": "application/json"})

def _get_embedding():
resp = _client.embeddings.create(model=model_to_use, input=text)
return resp.data[0].embedding

try:
return _retry_with_backoff(_get_embedding)
except Exception as e:
raise RuntimeError(f"Failed to obtain embedding from OpenAI client: {e}") from e
def _log_request_start(self, request_id: str, file_path: str, chunk_index: int, chunk_len: int):
_embedding_logger.debug(
"Embedding request START",
extra={
"request_id": request_id,
"file": file_path,
"chunk_index": chunk_index,
"chunk_length": chunk_len,
"model": self.model,
"api_url": self.api_url,
"timeout": self.timeout,
},
)

def _log_request_end(self, request_id: str, elapsed: float, status: Optional[int], response_body_preview: str):
_embedding_logger.debug(
"Embedding request END",
extra={
"request_id": request_id,
"elapsed_s": elapsed,
"status": status,
"response_preview": response_body_preview,
},
)

def embed_text(self, text: str, file_path: str = "<unknown>", chunk_index: int = 0) -> List[float]:
"""
Embed a single chunk of text. Returns the embedding vector.
Raises EmbeddingError on failure.
"""
request_id = str(uuid.uuid4())
chunk_len = len(text)
self._log_request_start(request_id, file_path, chunk_index, chunk_len)

payload = {
"model": self.model,
"input": text,
}

attempt = 0
err_msg = ""
while True:
attempt += 1
start = time.perf_counter()
try:
resp = self.session.post(
self.api_url,
data=json.dumps(payload),
timeout=self.timeout,
)
elapsed = time.perf_counter() - start

# Try to parse JSON safely
try:
resp_json = resp.json()
except Exception:
resp_json = None

preview = ""
if resp_json is not None:
preview = json.dumps(resp_json)[:1000]
else:
preview = (resp.text or "")[:1000]

self._log_request_end(request_id, elapsed, resp.status_code, preview)

if resp.status_code >= 200 and resp.status_code < 300:
# expected format: {"data": [{"embedding": [...]}], ...}
if not resp_json:
raise EmbeddingError(f"Empty JSON response (status={resp.status_code})")
try:
# tolerant extraction
data = resp_json.get("data") if isinstance(resp_json, dict) else None
if data and isinstance(data, list) and len(data) > 0:
emb = data[0].get("embedding")
if emb and isinstance(emb, list):
_embedding_logger.info(
"Embedding succeeded",
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index},
)
return emb
# Fallback: maybe top-level "embedding" key
if isinstance(resp_json, dict) and "embedding" in resp_json:
emb = resp_json["embedding"]
if isinstance(emb, list):
return emb
raise EmbeddingError(f"Unexpected embedding response shape: {resp_json}")
except KeyError as e:
raise EmbeddingError(f"Missing keys in embedding response: {e}")
else:
# Non-2xx
_embedding_logger.warning(
"Embedding API returned non-2xx",
extra={
"request_id": request_id,
"status_code": resp.status_code,
"file": file_path,
"chunk_index": chunk_index,
"attempt": attempt,
"body_preview": preview,
},
)
# fall through to retry logic
err_msg = f"Status {resp.status_code}: {preview}"

except requests.Timeout as e:
elapsed = time.perf_counter() - start
err_msg = f"Timeout after {elapsed:.2f}s: {e}"
_embedding_logger.error("Embedding API Timeout", extra={"request_id": request_id, "error": str(e)})
except requests.RequestException as e:
elapsed = time.perf_counter() - start
err_msg = f"RequestException after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
_embedding_logger.error("Embedding request exception", extra={"request_id": request_id, "error": err_msg})
except Exception as e:
elapsed = time.perf_counter() - start
err_msg = f"Unexpected error after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
_embedding_logger.exception("Unexpected embedding exception", extra={"request_id": request_id})

# Retry logic
if attempt > self.max_retries:
_embedding_logger.error(
"Max retries exceeded for embedding request",
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index, "attempts": attempt},
)
raise EmbeddingError(f"Failed to get embedding after {attempt} attempts. Last error: {err_msg}")

# Backoff and retry
sleep_for = self.backoff * (2 ** (attempt - 1))
_embedding_logger.info(
"Retrying embedding request",
extra={
"request_id": request_id,
"file": file_path,
"chunk_index": chunk_index,
"attempt": attempt,
"sleep_s": sleep_for,
},
)
time.sleep(sleep_for)

def embed_multiple(self, chunks: List[str], file_path: str = "<unknown>") -> List[Dict[str, Any]]:
"""
Embed a list of text chunks. Returns list of dicts: {"chunk_index": i, "embedding": [...]}.
This method logs progress and errors for each chunk.
"""
results = []
for i, chunk in enumerate(chunks):
try:
emb = self.embed_text(chunk, file_path=file_path, chunk_index=i)
results.append({"chunk_index": i, "embedding": emb})
except EmbeddingError as e:
_embedding_logger.error(
"Failed to embed chunk",
extra={"file": file_path, "chunk_index": i, "error": str(e)},
)
# append a failure marker or skip depending on desired behavior
results.append({"chunk_index": i, "embedding": None, "error": str(e)})
return results


def call_coding_api(prompt: str, model: Optional[str] = None, max_tokens: int = 1024):
Expand Down