From 97e1449d1133de81596e9de882c356884733e50a Mon Sep 17 00:00:00 2001 From: yassin Date: Thu, 26 Feb 2026 21:59:52 +0100 Subject: [PATCH 1/6] Harden iFlow proxy requests to match CLI behavior Add signed-header fallback and base-url failover while fixing env OAuth credential loading/refresh so proxy calls are less likely to get blocked. --- .../providers/iflow_auth_base.py | 64 ++++- .../providers/iflow_provider.py | 256 +++++++++++++++--- 2 files changed, 271 insertions(+), 49 deletions(-) diff --git a/src/rotator_library/providers/iflow_auth_base.py b/src/rotator_library/providers/iflow_auth_base.py index 0ac03208..955a9654 100644 --- a/src/rotator_library/providers/iflow_auth_base.py +++ b/src/rotator_library/providers/iflow_auth_base.py @@ -42,6 +42,7 @@ # Cookie-based authentication endpoint IFLOW_API_KEY_ENDPOINT = "https://platform.iflow.cn/api/openapi/apikey" +IFLOW_DEFAULT_API_BASE = "https://apis.iflow.cn/v1" # Client credentials provided by iFlow IFLOW_CLIENT_ID = "10009311001" @@ -331,6 +332,32 @@ def __init__(self): self._refresh_interval_seconds: int = 30 # Delay between queue items self._refresh_max_retries: int = 3 # Attempts before kicked out + def get_api_base_candidates(self) -> List[str]: + """Return ordered iFlow API base candidates from environment variables.""" + candidates: List[str] = [] + + single_base = os.getenv("IFLOW_API_BASE", "").strip() + if single_base: + normalized = single_base.rstrip("/") + if normalized: + candidates.append(normalized) + + base_list = os.getenv("IFLOW_API_BASES", "").strip() + if base_list: + for item in base_list.split(","): + normalized = item.strip().rstrip("/") + if normalized and normalized not in candidates: + candidates.append(normalized) + + if IFLOW_DEFAULT_API_BASE not in candidates: + candidates.append(IFLOW_DEFAULT_API_BASE) + + return candidates + + def get_api_base(self) -> str: + """Return the primary iFlow API base URL.""" + return self.get_api_base_candidates()[0] + def _parse_env_credential_path(self, path: str) -> Optional[str]: """ Parse a virtual env:// path and return the credential index. @@ -965,12 +992,32 @@ async def _refresh_token(self, path: str, force: bool = False) -> Dict[str, Any] if not force and cached_creds and not self._is_token_expired(cached_creds): return cached_creds - # [ROTATING TOKEN FIX] Always read fresh from disk before refresh. - # iFlow may use rotating refresh tokens - each refresh could invalidate the previous token. - # If we use a stale cached token, refresh will fail. - # Reading fresh from disk ensures we have the latest token. - await self._read_creds_from_file(path) - creds_from_file = self._credentials_cache[path] + # For file-based credentials, read fresh from disk before refresh. + # For env-loaded credentials, refresh using cached/env values (no file IO). + creds_from_file: Optional[Dict[str, Any]] = None + if cached_creds and cached_creds.get("_proxy_metadata", {}).get( + "loaded_from_env" + ): + creds_from_file = cached_creds + elif self._parse_env_credential_path(path) is not None: + credential_index = self._parse_env_credential_path(path) + env_creds = self._load_from_env(credential_index) + if env_creds: + self._credentials_cache[path] = env_creds + creds_from_file = env_creds + else: + raise ValueError( + f"No environment credentials found for iFlow path: {path}" + ) + else: + # [ROTATING TOKEN FIX] Always read fresh from disk before refresh. + # iFlow may use rotating refresh tokens - each refresh could invalidate + # the previous token. Fresh disk read keeps us in sync. + await self._read_creds_from_file(path) + creds_from_file = self._credentials_cache[path] + + if creds_from_file is None: + raise ValueError(f"No credentials available for iFlow refresh: {path}") lib_logger.debug(f"Refreshing iFlow OAuth token for '{Path(path).name}'...") refresh_token = creds_from_file.get("refresh_token") @@ -1215,7 +1262,8 @@ async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]: - API Key: credential_identifier is the API key string itself """ # Detect credential type - if os.path.isfile(credential_identifier): + credential_index = self._parse_env_credential_path(credential_identifier) + if credential_index is not None or os.path.isfile(credential_identifier): creds = await self._load_credentials(credential_identifier) # Check if this is a cookie-based credential @@ -1249,7 +1297,7 @@ async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]: lib_logger.debug("Using direct API key for iFlow") api_key = credential_identifier - base_url = "https://apis.iflow.cn/v1" + base_url = self.get_api_base() return base_url, api_key async def proactively_refresh(self, credential_identifier: str): diff --git a/src/rotator_library/providers/iflow_provider.py b/src/rotator_library/providers/iflow_provider.py index ae292719..db4ef758 100644 --- a/src/rotator_library/providers/iflow_provider.py +++ b/src/rotator_library/providers/iflow_provider.py @@ -29,6 +29,7 @@ # Model list can be expanded as iFlow supports more models HARDCODED_MODELS = [ + "glm-5", "glm-4.6", "glm-4.7", "minimax-m2", @@ -72,8 +73,10 @@ IFLOW_USER_AGENT = "iFlow-Cli" IFLOW_HEADER_SESSION_ID = "session-id" +IFLOW_HEADER_CONVERSATION_ID = "conversation-id" IFLOW_HEADER_TIMESTAMP = "x-iflow-timestamp" IFLOW_HEADER_SIGNATURE = "x-iflow-signature" +IFLOW_HEADER_API_KEY = "x-api-key" # ============================================================================= # THINKING MODE CONFIGURATION @@ -187,34 +190,87 @@ def extract_model_id(item) -> str: if os.path.isfile(credential): await self.initialize_token(credential) - api_base, api_key = await self.get_api_details(credential) - models_url = f"{api_base.rstrip('/')}/models" + _, api_key = await self.get_api_details(credential) + api_bases = self.get_api_base_candidates() + request_ids = self._extract_iflow_ids({}) + + last_error: Optional[Exception] = None + fetched = False + + for idx, api_base in enumerate(api_bases): + models_url = f"{api_base.rstrip('/')}/models" + try: + headers = self._build_iflow_headers( + api_key=api_key, + stream=False, + request_ids=request_ids, + include_signature=True, + ) + response = await client.get( + models_url, + headers=headers, + timeout=TimeoutConfig.non_streaming(), + ) + + if response.status_code == 406: + unsigned_headers = self._build_iflow_headers( + api_key=api_key, + stream=False, + request_ids=request_ids, + include_signature=False, + ) + response = await client.get( + models_url, + headers=unsigned_headers, + timeout=TimeoutConfig.non_streaming(), + ) - response = await client.get( - models_url, headers={"Authorization": f"Bearer {api_key}"} - ) - response.raise_for_status() - - dynamic_data = response.json() - # Handle both {data: [...]} and direct [...] formats - model_list = ( - dynamic_data.get("data", dynamic_data) - if isinstance(dynamic_data, dict) - else dynamic_data - ) + response.raise_for_status() + + dynamic_data = response.json() + # Handle both {data: [...]} and direct [...] formats + model_list = ( + dynamic_data.get("data", dynamic_data) + if isinstance(dynamic_data, dict) + else dynamic_data + ) + + dynamic_count = 0 + for model in model_list: + model_id = extract_model_id(model) + if model_id and model_id not in env_var_ids: + models.append(f"iflow/{model_id}") + env_var_ids.add(model_id) + dynamic_count += 1 + + if dynamic_count > 0: + lib_logger.debug( + f"Discovered {dynamic_count} additional models for iflow from API" + ) - dynamic_count = 0 - for model in model_list: - model_id = extract_model_id(model) - if model_id and model_id not in env_var_ids: - models.append(f"iflow/{model_id}") - env_var_ids.add(model_id) - dynamic_count += 1 + fetched = True + break - if dynamic_count > 0: - lib_logger.debug( - f"Discovered {dynamic_count} additional models for iflow from API" - ) + except (httpx.RequestError, httpx.TimeoutException) as e: + last_error = e + if idx + 1 < len(api_bases): + lib_logger.warning( + f"iFlow models fetch network error on {api_base}, trying fallback base" + ) + continue + raise + except httpx.HTTPStatusError as e: + last_error = e + status_code = e.response.status_code + if self._should_fallback_base(status_code) and idx + 1 < len(api_bases): + lib_logger.warning( + f"iFlow models fetch got HTTP {status_code} on {api_base}, trying fallback base" + ) + continue + raise + + if not fetched and last_error: + raise last_error except Exception as e: # Silently ignore dynamic discovery errors @@ -530,27 +586,61 @@ def _create_iflow_signature( api_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 ).hexdigest() - def _build_iflow_headers(self, api_key: str, stream: bool) -> Dict[str, str]: - """Build iFlow request headers, including signed auth headers.""" - session_id = f"session-{uuid.uuid4()}" + def _extract_iflow_ids( + self, request_args: Optional[Dict[str, Any]] = None + ) -> Dict[str, str]: + """Extract session/conversation IDs from request args.""" + request_args = request_args or {} + + def _pick(*keys: str) -> str: + for key in keys: + value = request_args.get(key) + if value: + return str(value) + return "" + + generated_session_id = f"session-{uuid.uuid4()}" + session_id = _pick("session_id", "sessionId") or generated_session_id + conversation_id = _pick("conversation_id", "conversationId") or session_id + return {"session_id": session_id, "conversation_id": conversation_id} + + def _build_iflow_headers( + self, + api_key: str, + stream: bool, + request_ids: Optional[Dict[str, str]] = None, + include_signature: bool = True, + ) -> Dict[str, str]: + """Build iFlow request headers, with optional anti-block signature headers.""" + request_ids = request_ids or self._extract_iflow_ids() + session_id = request_ids["session_id"] + conversation_id = request_ids["conversation_id"] timestamp_ms = int(time.time() * 1000) - signature = self._create_iflow_signature( - IFLOW_USER_AGENT, session_id, timestamp_ms, api_key - ) headers = { "Authorization": f"Bearer {api_key}", + IFLOW_HEADER_API_KEY: api_key, "Content-Type": "application/json", "User-Agent": IFLOW_USER_AGENT, IFLOW_HEADER_SESSION_ID: session_id, - IFLOW_HEADER_TIMESTAMP: str(timestamp_ms), + IFLOW_HEADER_CONVERSATION_ID: conversation_id, "Accept": "text/event-stream" if stream else "application/json", } - if signature: + if include_signature: + signature = self._create_iflow_signature( + IFLOW_USER_AGENT, session_id, timestamp_ms, api_key + ) + headers[IFLOW_HEADER_TIMESTAMP] = str(timestamp_ms) headers[IFLOW_HEADER_SIGNATURE] = signature return headers + def _should_fallback_base(self, status_code: int) -> bool: + """Return True for block-like status codes worth base fallback.""" + if status_code in {403, 406, 408, 423, 451, 502, 503, 504}: + return True + return 520 <= status_code <= 530 + def _extract_finish_reason_from_chunk(self, chunk: Dict[str, Any]) -> Optional[str]: """ Extract finish_reason from a raw iFlow chunk by searching all possible locations. @@ -945,11 +1035,18 @@ async def acompletion( # Create provider logger from transaction context file_logger = ProviderLogger(transaction_context) + request_ids = self._extract_iflow_ids(kwargs) + api_bases = self.get_api_base_candidates() - async def make_request(): + class _NextBaseError(Exception): + def __init__(self, message: str, cause: Optional[Exception] = None): + super().__init__(message) + self.cause = cause + + async def make_request(api_base: str, include_signature: bool = True): """Prepares and makes the actual API call.""" # CRITICAL: get_api_details returns api_key, NOT access_token - api_base, api_key = await self.get_api_details(credential_path) + _, api_key = await self.get_api_details(credential_path) # Strip provider prefix from model name (e.g., "iflow/Qwen3-Coder-Plus" -> "Qwen3-Coder-Plus") model_name = model.split("/")[-1] @@ -964,6 +1061,8 @@ async def make_request(): headers = self._build_iflow_headers( api_key=api_key, stream=bool(payload.get("stream")), + request_ids=request_ids, + include_signature=include_signature, ) url = f"{api_base.rstrip('/')}/chat/completions" @@ -980,7 +1079,13 @@ async def make_request(): timeout=TimeoutConfig.streaming(), ) - async def stream_handler(response_stream, attempt=1): + async def stream_handler( + response_stream, + api_base: str, + attempt: int = 1, + include_signature: bool = True, + allow_unsigned_retry: bool = True, + ): """Handles the streaming response and converts chunks.""" # Track state across chunks for finish_reason normalization stream_state: Dict[str, Any] = {} @@ -1001,8 +1106,38 @@ async def stream_handler(response_stream, attempt=1): "iFlow returned 401. Forcing token refresh and retrying once." ) await self._refresh_token(credential_path, force=True) - retry_stream = await make_request() - async for chunk in stream_handler(retry_stream, attempt=2): + retry_stream = await make_request( + api_base, include_signature=include_signature + ) + async for chunk in stream_handler( + retry_stream, + api_base=api_base, + attempt=2, + include_signature=include_signature, + allow_unsigned_retry=allow_unsigned_retry, + ): + yield chunk + return + + # Handle 406: retry once without signature headers + elif ( + response.status_code == 406 + and include_signature + and allow_unsigned_retry + ): + lib_logger.warning( + f"iFlow returned 406 on {api_base}, retrying once without signature headers" + ) + retry_stream = await make_request( + api_base, include_signature=False + ) + async for chunk in stream_handler( + retry_stream, + api_base=api_base, + attempt=attempt, + include_signature=False, + allow_unsigned_retry=False, + ): yield chunk return @@ -1018,6 +1153,16 @@ async def stream_handler(response_stream, attempt=1): response=response, ) + elif self._should_fallback_base(response.status_code): + raise _NextBaseError( + f"iFlow HTTP {response.status_code} on {api_base}", + cause=httpx.HTTPStatusError( + f"HTTP {response.status_code}: {error_text}", + request=response.request, + response=response, + ), + ) + # Handle other errors else: if not error_text: @@ -1064,6 +1209,12 @@ async def stream_handler(response_stream, attempt=1): except httpx.HTTPStatusError: raise # Re-raise HTTP errors we already handled + except (httpx.RequestError, httpx.TimeoutException) as e: + raise _NextBaseError( + f"Network error on iFlow base {api_base}: {e}", cause=e + ) + except _NextBaseError: + raise except Exception as e: file_logger.log_error(f"Error during iFlow stream processing: {e}") lib_logger.error( @@ -1074,10 +1225,33 @@ async def stream_handler(response_stream, attempt=1): async def logging_stream_wrapper(): """Wraps the stream to log the final reassembled response and cache reasoning.""" openai_chunks = [] + last_fallback_error: Optional[Exception] = None try: - async for chunk in stream_handler(await make_request()): - openai_chunks.append(chunk) - yield chunk + for idx, api_base in enumerate(api_bases): + try: + async for chunk in stream_handler( + await make_request(api_base, include_signature=True), + api_base=api_base, + include_signature=True, + allow_unsigned_retry=True, + ): + openai_chunks.append(chunk) + yield chunk + return + except _NextBaseError as e: + if openai_chunks: + raise e.cause or e + + last_fallback_error = e.cause or e + if idx + 1 < len(api_bases): + lib_logger.warning( + f"iFlow base {api_base} failed ({e}); trying fallback base" + ) + continue + raise last_fallback_error + + if last_fallback_error: + raise last_fallback_error finally: if openai_chunks: final_response = self._stream_to_completion_response(openai_chunks) From 407032eeb85c13ff7dbb4eb1e6caffc8b0ba0a58 Mon Sep 17 00:00:00 2001 From: yassin Date: Fri, 27 Feb 2026 13:18:14 +0100 Subject: [PATCH 2/6] Complete iFlow CLI parity and restore thinking behavior Consolidate follow-up fixes after initial hardening: add captured-header parity and metadata propagation, preserve usage details, introduce sticky session/conversation IDs, and fix reasoning regressions by removing forced disabled-thinking defaults while enabling thinking by default. --- src/proxy_app/main.py | 46 ++ src/proxy_app/provider_urls.py | 3 +- .../providers/iflow_auth_base.py | 36 +- .../providers/iflow_provider.py | 447 ++++++++++++++++-- 4 files changed, 497 insertions(+), 35 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 3e4bbbbc..4205fad9 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -886,6 +886,47 @@ async def streaming_response_wrapper( ) +def _inject_iflow_metadata_from_incoming_headers( + request_data: dict[str, Any], + request_headers: dict[str, str], +) -> None: + """Propagate iFlow-specific routing headers into request metadata.""" + model_name = str(request_data.get("model", "")) + provider_name = model_name.split("/", 1)[0].lower() if "/" in model_name else "" + if provider_name != "iflow": + return + + metadata = request_data.get("metadata") + if not isinstance(metadata, dict): + metadata = {} + + def _pick(*header_names: str) -> str: + for key in header_names: + value = request_headers.get(key) + if value: + return value + return "" + + mappings = [ + ("session_id", ("session-id", "x-litellm-session-id")), + ("conversation_id", ("conversation-id", "x-litellm-conversation-id")), + ("traceparent", ("traceparent",)), + ("iflow_x_biz_info", ("x-biz-info",)), + ("iflow_eagleeye_userdata", ("eagleeye-userdata",)), + ("iflow_priority", ("priority",)), + ] + + for metadata_key, header_keys in mappings: + if metadata.get(metadata_key): + continue + picked = _pick(*header_keys) + if picked: + metadata[metadata_key] = picked + + if metadata: + request_data["metadata"] = metadata + + @app.post("/v1/chat/completions") async def chat_completions( request: Request, @@ -933,6 +974,11 @@ async def chat_completions( if raw_logger: raw_logger.log_request(headers=request.headers, body=request_data) + _inject_iflow_metadata_from_incoming_headers( + request_data=request_data, + request_headers=dict(request.headers), + ) + # Extract and log specific reasoning parameters for monitoring. model = request_data.get("model") generation_cfg = ( diff --git a/src/proxy_app/provider_urls.py b/src/proxy_app/provider_urls.py index bc160292..1a09a7df 100644 --- a/src/proxy_app/provider_urls.py +++ b/src/proxy_app/provider_urls.py @@ -30,6 +30,7 @@ "cohere": "https://api.cohere.ai/v1", "bedrock": "https://bedrock-runtime.us-east-1.amazonaws.com", "openrouter": "https://openrouter.ai/api/v1", + "iflow": "https://apis.iflow.cn/v1", } def get_provider_endpoint(provider: str, model_name: str, incoming_path: str) -> Optional[str]: @@ -73,4 +74,4 @@ def get_provider_endpoint(provider: str, model_name: str, incoming_path: str) -> return f"{base_url}/{action}" # Fallback for other cases - return f"{base_url}/v1/{action}" \ No newline at end of file + return f"{base_url}/v1/{action}" diff --git a/src/rotator_library/providers/iflow_auth_base.py b/src/rotator_library/providers/iflow_auth_base.py index 955a9654..1c0a1a9a 100644 --- a/src/rotator_library/providers/iflow_auth_base.py +++ b/src/rotator_library/providers/iflow_auth_base.py @@ -43,6 +43,7 @@ # Cookie-based authentication endpoint IFLOW_API_KEY_ENDPOINT = "https://platform.iflow.cn/api/openapi/apikey" IFLOW_DEFAULT_API_BASE = "https://apis.iflow.cn/v1" +IFLOW_CLI_USER_AGENT = "iFlow-Cli" # Client credentials provided by iFlow IFLOW_CLIENT_ID = "10009311001" @@ -638,11 +639,21 @@ async def _fetch_user_info(self, access_token: str) -> Dict[str, Any]: if not access_token or not access_token.strip(): raise ValueError("Access token is empty") - url = f"{IFLOW_USER_INFO_ENDPOINT}?accessToken={access_token}" - headers = {"Accept": "application/json"} + headers = { + "Accept": "*/*", + "accessToken": access_token, + "User-Agent": "node", + "Accept-Language": "*", + "Sec-Fetch-Mode": "cors", + "Accept-Encoding": "br, gzip, deflate", + } async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.get(url, headers=headers) + response = await client.get( + IFLOW_USER_INFO_ENDPOINT, + headers=headers, + params={"accessToken": access_token}, + ) response.raise_for_status() result = response.json() @@ -1291,7 +1302,24 @@ async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]: api_key = creds.get("api_key") if not api_key: - raise ValueError("Missing api_key in iFlow OAuth credentials") + access_token = creds.get("access_token", "") + if access_token: + try: + user_info = await self._fetch_user_info(access_token) + api_key = user_info.get("api_key", "") + if api_key: + creds["api_key"] = api_key + if credential_index is None: + await self._save_credentials( + credential_identifier, + creds, + ) + except Exception as e: + lib_logger.warning( + f"Failed to recover iFlow api_key from OAuth user info: {e}" + ) + if not api_key: + raise ValueError("Missing api_key in iFlow OAuth credentials") else: # Direct API key: use as-is lib_logger.debug("Using direct API key for iFlow") diff --git a/src/rotator_library/providers/iflow_provider.py b/src/rotator_library/providers/iflow_provider.py index db4ef758..6d813412 100644 --- a/src/rotator_library/providers/iflow_provider.py +++ b/src/rotator_library/providers/iflow_provider.py @@ -9,9 +9,11 @@ import json import time import os +import re +import threading import httpx import logging -from typing import Union, AsyncGenerator, List, Dict, Any, Optional +from typing import Union, AsyncGenerator, List, Dict, Any, Optional, Tuple from .provider_interface import ProviderInterface from .iflow_auth_base import IFlowAuthBase from .provider_cache import ProviderCache @@ -60,6 +62,8 @@ "temperature", "top_p", "max_tokens", + "max_new_tokens", + "max_completion_tokens", "stream", "tools", "tool_choice", @@ -69,6 +73,10 @@ "stop", "seed", "response_format", + "thinking", + "enable_thinking", + "chat_template_kwargs", + "reasoning_split", } IFLOW_USER_AGENT = "iFlow-Cli" @@ -77,6 +85,19 @@ IFLOW_HEADER_TIMESTAMP = "x-iflow-timestamp" IFLOW_HEADER_SIGNATURE = "x-iflow-signature" IFLOW_HEADER_API_KEY = "x-api-key" +IFLOW_HEADER_TRACEPARENT = "traceparent" +IFLOW_HEADER_X_BIZ_INFO = "x-biz-info" +IFLOW_HEADER_EAGLEEYE_USERDATA = "EagleEye-UserData" +IFLOW_HEADER_PRIORITY = "priority" + +TRACEPARENT_PATTERN = re.compile(r"^00-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$") +DEFAULT_IFLOW_STICKY_MODE = "auto" +DEFAULT_IFLOW_STICKY_TTL_SECONDS = 86400 +DEFAULT_IFLOW_STICKY_MAX_ENTRIES = 10000 + + +def _is_truthy_env(value: str) -> bool: + return value.strip().lower() in {"1", "true", "yes", "on"} # ============================================================================= # THINKING MODE CONFIGURATION @@ -130,6 +151,31 @@ def __init__(self): env_prefix="IFLOW_REASONING_CACHE", ) + self._sticky_mode = os.getenv( + "IFLOW_STICKY_SESSION_MODE", DEFAULT_IFLOW_STICKY_MODE + ).strip().lower() + self._sticky_ttl_seconds = max( + 60, + int( + os.getenv( + "IFLOW_STICKY_SESSION_TTL_SECONDS", + str(DEFAULT_IFLOW_STICKY_TTL_SECONDS), + ) + ), + ) + self._sticky_max_entries = max( + 100, + int( + os.getenv( + "IFLOW_STICKY_SESSION_MAX_ENTRIES", + str(DEFAULT_IFLOW_STICKY_MAX_ENTRIES), + ) + ), + ) + self._sticky_lock = threading.Lock() + self._sticky_session_cache: Dict[str, Tuple[str, float]] = {} + self._sticky_conversation_cache: Dict[str, Tuple[str, float]] = {} + def has_custom_logic(self) -> bool: return True @@ -342,6 +388,45 @@ def _should_enable_thinking(self, kwargs: Dict[str, Any]) -> Optional[bool]: False: Disable thinking explicitly None: No thinking params (passthrough - don't modify payload) """ + # Check explicit iFlow thinking fields first + direct_thinking = kwargs.get("thinking") + if direct_thinking is not None: + if isinstance(direct_thinking, dict): + thinking_type = str(direct_thinking.get("type", "")).lower().strip() + if thinking_type == "disabled": + return False + if thinking_type == "enabled": + budget = direct_thinking.get("budget_tokens") + if budget is None: + return True + try: + return int(budget) != 0 + except Exception: + return True + return bool(direct_thinking) + + enable_thinking = kwargs.get("enable_thinking") + if enable_thinking is not None: + if isinstance(enable_thinking, str): + lowered = enable_thinking.lower().strip() + return lowered not in ("0", "false", "off", "none", "disabled") + return bool(enable_thinking) + + chat_template_kwargs = kwargs.get("chat_template_kwargs") + if isinstance(chat_template_kwargs, dict): + ctk_enable = chat_template_kwargs.get("enable_thinking") + if ctk_enable is not None: + if isinstance(ctk_enable, str): + lowered = ctk_enable.lower().strip() + return lowered not in ( + "0", + "false", + "off", + "none", + "disabled", + ) + return bool(ctk_enable) + # Check reasoning_effort (OpenAI-style) reasoning_effort = kwargs.get("reasoning_effort") if reasoning_effort is not None: @@ -571,6 +656,63 @@ def _build_request_payload( # Apply thinking mode configuration based on reasoning_effort payload = self._apply_thinking_config(payload, model_name, full_kwargs) + explicit_thinking = self._should_enable_thinking(full_kwargs) + + # CLI-like defaults when absent + payload.setdefault("temperature", 1) + payload.setdefault("top_p", 0.95) + + # Align token field naming with native iFlow CLI payload shape + if "max_new_tokens" not in payload: + if "max_completion_tokens" in payload: + payload["max_new_tokens"] = payload["max_completion_tokens"] + elif "max_tokens" in payload: + payload["max_new_tokens"] = payload["max_tokens"] + else: + payload["max_new_tokens"] = 32000 + + # Enable thinking by default unless caller explicitly disables it. + if "enable_thinking" not in payload: + if explicit_thinking is not None: + payload["enable_thinking"] = bool(explicit_thinking) + else: + payload["enable_thinking"] = _is_truthy_env( + os.getenv("IFLOW_ENABLE_THINKING_BY_DEFAULT", "true") + ) + + if "thinking" not in payload: + payload["thinking"] = { + "type": "enabled" if bool(payload["enable_thinking"]) else "disabled" + } + + model_lower = model_name.lower() + has_enable_thinking = "enable_thinking" in payload + enable_thinking_value = bool(payload.get("enable_thinking")) + if ( + model_lower.startswith("glm-") + or model_lower in ENABLE_THINKING_MODELS + or model_lower in GLM_MODELS + or isinstance(payload.get("chat_template_kwargs"), dict) + ): + chat_template = payload.get("chat_template_kwargs") + if not isinstance(chat_template, dict): + chat_template = {} + if has_enable_thinking: + chat_template.setdefault("enable_thinking", enable_thinking_value) + if model_lower in GLM_MODELS: + if has_enable_thinking and chat_template.get("enable_thinking"): + chat_template["clear_thinking"] = False + else: + chat_template.pop("clear_thinking", None) + if chat_template: + payload["chat_template_kwargs"] = chat_template + + if ( + model_lower in REASONING_SPLIT_MODELS + and "reasoning_split" not in payload + and has_enable_thinking + ): + payload["reasoning_split"] = enable_thinking_value return payload @@ -586,23 +728,261 @@ def _create_iflow_signature( api_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 ).hexdigest() + def _sticky_enabled(self) -> bool: + return self._sticky_mode not in {"off", "false", "0", "none", "disabled"} + + def _sticky_mode_value(self) -> str: + mode = self._sticky_mode + if mode in {"conversation", "conv"}: + return "conversation" + if mode in {"client", "user"}: + return "client" + return "auto" + + def _prune_sticky_cache(self, cache: Dict[str, Tuple[str, float]], now: float) -> None: + expired = [ + key for key, (_, ts) in cache.items() if now - ts > self._sticky_ttl_seconds + ] + for key in expired: + cache.pop(key, None) + + overflow = len(cache) - self._sticky_max_entries + if overflow > 0: + oldest = sorted(cache.items(), key=lambda item: item[1][1])[:overflow] + for key, _ in oldest: + cache.pop(key, None) + + def _get_sticky_value( + self, + cache: Dict[str, Tuple[str, float]], + keys: List[str], + now: float, + ) -> Optional[str]: + self._prune_sticky_cache(cache, now) + for key in keys: + cached = cache.get(key) + if cached is None: + continue + value, _ = cached + cache[key] = (value, now) + return value + return None + + def _set_sticky_value( + self, + cache: Dict[str, Tuple[str, float]], + keys: List[str], + value: str, + now: float, + ) -> None: + self._prune_sticky_cache(cache, now) + for key in keys: + cache[key] = (value, now) + + def _build_sticky_keys( + self, + sources: List[Dict[str, Any]], + messages: Any, + conversation_id: str, + ) -> List[str]: + mode = self._sticky_mode_value() + + def _pick_case_insensitive(*keys: str) -> str: + for source in sources: + for key in keys: + for source_key, value in source.items(): + if ( + isinstance(source_key, str) + and source_key.lower() == key.lower() + and value not in (None, "") + ): + return str(value) + return "" + + keys: List[str] = [] + if conversation_id: + conv_hash = hashlib.sha256(conversation_id.encode("utf-8")).hexdigest()[:32] + keys.append(f"conv:{conv_hash}") + + client_identifier = _pick_case_insensitive( + "client_id", + "clientId", + "session_key", + "thread_id", + "threadId", + "user", + "user_id", + "userId", + "x-user-id", + "x-client-id", + ) + if client_identifier: + client_hash = hashlib.sha256(client_identifier.encode("utf-8")).hexdigest()[:32] + keys.append(f"client:{client_hash}") + + if _is_truthy_env(os.getenv("IFLOW_STICKY_HISTORY_FALLBACK", "false")) and isinstance( + messages, list + ): + conv_sig = self._get_conversation_signature(messages) + if conv_sig and conv_sig != "default": + keys.append(f"history:{conv_sig}") + + if mode == "conversation": + keys = [k for k in keys if k.startswith("conv:") or k.startswith("history:")] + elif mode == "client": + keys = [k for k in keys if k.startswith("client:") or k.startswith("history:")] + + return list(dict.fromkeys(keys)) + def _extract_iflow_ids( self, request_args: Optional[Dict[str, Any]] = None ) -> Dict[str, str]: - """Extract session/conversation IDs from request args.""" + """Extract iFlow routing + tracing metadata from request args.""" request_args = request_args or {} + metadata = request_args.get("metadata") + if not isinstance(metadata, dict): + metadata = {} + + extra_headers = request_args.get("extra_headers") + if not isinstance(extra_headers, dict): + extra_headers = {} + + sources: List[Dict[str, Any]] = [request_args, metadata, extra_headers] + def _pick(*keys: str) -> str: - for key in keys: - value = request_args.get(key) - if value: - return str(value) + for source in sources: + for key in keys: + for source_key, value in source.items(): + if ( + isinstance(source_key, str) + and source_key.lower() == key.lower() + and value not in (None, "") + ): + return str(value) return "" - generated_session_id = f"session-{uuid.uuid4()}" - session_id = _pick("session_id", "sessionId") or generated_session_id - conversation_id = _pick("conversation_id", "conversationId") or session_id - return {"session_id": session_id, "conversation_id": conversation_id} + def _generate_traceparent() -> str: + trace_id = uuid.uuid4().hex + span_id = uuid.uuid4().hex[:16] + return f"00-{trace_id}-{span_id}-01" + + def _normalize_traceparent(raw_value: str) -> str: + lowered = raw_value.strip().lower() + if lowered and TRACEPARENT_PATTERN.match(lowered): + return lowered + return _generate_traceparent() + + explicit_session_id = _pick( + "session_id", + "sessionId", + "litellm_session_id", + "session-id", + "x-litellm-session-id", + ) + explicit_conversation_id = _pick( + "conversation_id", + "conversationId", + "litellm_conversation_id", + "conversation-id", + "x-litellm-conversation-id", + ) + + messages = request_args.get("messages") + + conversation_id = explicit_conversation_id + conversation_keys = self._build_sticky_keys( + sources=sources, + messages=messages, + conversation_id=conversation_id, + ) + if not conversation_id: + if self._sticky_enabled() and conversation_keys: + now = time.time() + with self._sticky_lock: + cached_conversation = self._get_sticky_value( + cache=self._sticky_conversation_cache, + keys=conversation_keys, + now=now, + ) + if cached_conversation: + conversation_id = cached_conversation + else: + conversation_id = str(uuid.uuid4()) + self._set_sticky_value( + cache=self._sticky_conversation_cache, + keys=conversation_keys, + value=conversation_id, + now=now, + ) + else: + conversation_id = str(uuid.uuid4()) + + session_keys = self._build_sticky_keys( + sources=sources, + messages=messages, + conversation_id=conversation_id, + ) + session_id = explicit_session_id + if not session_id: + if self._sticky_enabled() and session_keys: + now = time.time() + with self._sticky_lock: + cached_session = self._get_sticky_value( + cache=self._sticky_session_cache, + keys=session_keys, + now=now, + ) + if cached_session: + session_id = cached_session + else: + session_id = f"session-{uuid.uuid4()}" + self._set_sticky_value( + cache=self._sticky_session_cache, + keys=session_keys, + value=session_id, + now=now, + ) + else: + session_id = f"session-{uuid.uuid4()}" + elif self._sticky_enabled() and session_keys: + now = time.time() + with self._sticky_lock: + self._set_sticky_value( + cache=self._sticky_session_cache, + keys=session_keys, + value=str(session_id), + now=now, + ) + + if self._sticky_enabled() and conversation_keys and conversation_id: + now = time.time() + with self._sticky_lock: + self._set_sticky_value( + cache=self._sticky_conversation_cache, + keys=conversation_keys, + value=str(conversation_id), + now=now, + ) + + traceparent = _normalize_traceparent( + _pick("traceparent", IFLOW_HEADER_TRACEPARENT) + ) + + return { + "session_id": session_id, + "conversation_id": conversation_id, + "traceparent": traceparent, + "x_biz_info": _pick( + "iflow_x_biz_info", "x_biz_info", IFLOW_HEADER_X_BIZ_INFO + ), + "eagleeye_userdata": _pick( + "iflow_eagleeye_userdata", + "eagleeye_userdata", + IFLOW_HEADER_EAGLEEYE_USERDATA, + ), + "priority": _pick("iflow_priority", IFLOW_HEADER_PRIORITY), + } def _build_iflow_headers( self, @@ -615,17 +995,34 @@ def _build_iflow_headers( request_ids = request_ids or self._extract_iflow_ids() session_id = request_ids["session_id"] conversation_id = request_ids["conversation_id"] + traceparent = request_ids.get("traceparent", "") timestamp_ms = int(time.time() * 1000) headers = { "Authorization": f"Bearer {api_key}", - IFLOW_HEADER_API_KEY: api_key, "Content-Type": "application/json", "User-Agent": IFLOW_USER_AGENT, IFLOW_HEADER_SESSION_ID: session_id, IFLOW_HEADER_CONVERSATION_ID: conversation_id, - "Accept": "text/event-stream" if stream else "application/json", + IFLOW_HEADER_TRACEPARENT: traceparent, + "Accept": "*/*", + "Accept-Language": "*", + "Sec-Fetch-Mode": "cors", + "Accept-Encoding": "br, gzip, deflate", } + + if _is_truthy_env(os.getenv("IFLOW_SEND_X_API_KEY", "false")): + headers[IFLOW_HEADER_API_KEY] = api_key + + if request_ids.get("x_biz_info"): + headers[IFLOW_HEADER_X_BIZ_INFO] = request_ids["x_biz_info"] + if request_ids.get("eagleeye_userdata"): + headers[IFLOW_HEADER_EAGLEEYE_USERDATA] = request_ids[ + "eagleeye_userdata" + ] + if request_ids.get("priority"): + headers[IFLOW_HEADER_PRIORITY] = request_ids["priority"] + if include_signature: signature = self._create_iflow_signature( IFLOW_USER_AGENT, session_id, timestamp_ms, api_key @@ -794,15 +1191,10 @@ def normalize_choices( # Normalize choices for final chunk - MUST set finish_reason normalized_choices = normalize_choices(choices, force_final=True) # Build usage dict, handling empty usage gracefully - usage_dict = { - "prompt_tokens": usage_data.get("prompt_tokens", 0) - if usage_data - else 0, - "completion_tokens": usage_data.get("completion_tokens", 0) - if usage_data - else 0, - "total_tokens": usage_data.get("total_tokens", 0) if usage_data else 0, - } + usage_dict = dict(usage_data) if isinstance(usage_data, dict) else {} + usage_dict.setdefault("prompt_tokens", 0) + usage_dict.setdefault("completion_tokens", 0) + usage_dict.setdefault("total_tokens", 0) # CRITICAL FIX: If usage is empty/all-zeros (e.g., MiniMax sends "usage": {}), # set placeholder non-zero values to ensure downstream processing @@ -832,15 +1224,10 @@ def normalize_choices( # Handle usage-only chunks (no choices) if has_usage and not choices: - usage_dict = { - "prompt_tokens": usage_data.get("prompt_tokens", 0) - if usage_data - else 0, - "completion_tokens": usage_data.get("completion_tokens", 0) - if usage_data - else 0, - "total_tokens": usage_data.get("total_tokens", 0) if usage_data else 0, - } + usage_dict = dict(usage_data) if isinstance(usage_data, dict) else {} + usage_dict.setdefault("prompt_tokens", 0) + usage_dict.setdefault("completion_tokens", 0) + usage_dict.setdefault("total_tokens", 0) yield { "choices": [], "model": model_id, From b04c22e0fea269cb849acf35333a2fc743019f84 Mon Sep 17 00:00:00 2001 From: yassin Date: Fri, 27 Feb 2026 16:34:50 +0100 Subject: [PATCH 3/6] Reduce iFlow proxy stream logging overhead --- src/proxy_app/main.py | 15 ++-- src/rotator_library/transaction_logger.py | 100 ++++++++++++++++++++-- 2 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 4205fad9..d39a3be2 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -724,7 +724,8 @@ async def streaming_response_wrapper( Wraps a streaming response to log the full response after completion and ensures any errors during the stream are sent to the client. """ - response_chunks = [] + should_log_stream = logger is not None + response_chunks = [] if should_log_stream else None full_response = {} try: @@ -733,14 +734,16 @@ async def streaming_response_wrapper( logging.warning("Client disconnected, stopping stream.") break yield chunk_str + if not should_log_stream: + continue if chunk_str.strip() and chunk_str.startswith("data:"): content = chunk_str[len("data:") :].strip() if content != "[DONE]": try: chunk_data = json.loads(content) - response_chunks.append(chunk_data) - if logger: - logger.log_stream_chunk(chunk_data) + if response_chunks is not None: + response_chunks.append(chunk_data) + logger.log_stream_chunk(chunk_data) except json.JSONDecodeError: pass except Exception as e: @@ -762,7 +765,7 @@ async def streaming_response_wrapper( ) return # Stop further processing finally: - if response_chunks: + if should_log_stream and response_chunks: # --- Aggregation Logic --- final_message = {"role": "assistant"} aggregated_tool_calls = {} @@ -878,7 +881,7 @@ async def streaming_response_wrapper( "usage": usage_data, } - if logger: + if should_log_stream: logger.log_final_response( status_code=200, headers=None, # Headers are not available at this stage diff --git a/src/rotator_library/transaction_logger.py b/src/rotator_library/transaction_logger.py index e1de4d67..24aa9f33 100644 --- a/src/rotator_library/transaction_logger.py +++ b/src/rotator_library/transaction_logger.py @@ -31,7 +31,7 @@ from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional, TextIO, Union from .utils.paths import get_logs_dir @@ -104,6 +104,7 @@ class TransactionLogger: "api_format", "_dir_available", "_context", + "_append_handles", ) def __init__( @@ -141,6 +142,7 @@ def __init__( self.log_dir: Optional[Path] = None self._dir_available = False self._context: Optional[TransactionContext] = None + self._append_handles: Dict[str, TextIO] = {} if not enabled: return @@ -256,6 +258,7 @@ def log_response( # Also write metadata self._log_metadata(response_data, status_code, duration_ms) + self.close_append_files() def _log_metadata( self, response_data: Dict[str, Any], status_code: int, duration_ms: float @@ -327,6 +330,7 @@ def _write_json(self, filename: str, data: Dict[str, Any]) -> None: """Write JSON data to a file in the log directory.""" if not self.log_dir: return + self._close_append_handle(filename) try: with open(self.log_dir / filename, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) @@ -338,11 +342,51 @@ def _append_text(self, filename: str, text: str) -> None: if not self.log_dir: return try: - with open(self.log_dir / filename, "a", encoding="utf-8") as f: - f.write(text) + handle = self._get_append_handle(filename) + if handle: + handle.write(text) except Exception as e: lib_logger.error(f"TransactionLogger: Failed to append to {filename}: {e}") + def _get_append_handle(self, filename: str) -> Optional[TextIO]: + """Get (or lazily open) an append handle for a transaction log file.""" + if not self.log_dir: + return None + handle = self._append_handles.get(filename) + if handle and not handle.closed: + return handle + handle = open(self.log_dir / filename, "a", encoding="utf-8") + self._append_handles[filename] = handle + return handle + + def _close_append_handle(self, filename: str) -> None: + """Close and forget a single cached append handle by filename.""" + handle = self._append_handles.pop(filename, None) + if not handle: + return + try: + handle.flush() + handle.close() + except Exception: + pass + + def flush_append_files(self) -> None: + """Flush cached append handles to reduce data loss risk.""" + for handle in list(self._append_handles.values()): + try: + if not handle.closed: + handle.flush() + except Exception: + pass + + def close_append_files(self) -> None: + """Flush and close all cached append handles.""" + for filename in list(self._append_handles.keys()): + self._close_append_handle(filename) + + def __del__(self) -> None: + self.close_append_files() + @staticmethod def assemble_streaming_response( chunks: list, request_data: Optional[Dict[str, Any]] = None @@ -485,7 +529,7 @@ class ProviderLogger: or add custom methods for provider-specific logging needs. """ - __slots__ = ("enabled", "log_dir") + __slots__ = ("enabled", "log_dir", "_append_handles") def __init__(self, context: Optional[TransactionContext]): """ @@ -496,6 +540,7 @@ def __init__(self, context: Optional[TransactionContext]): """ self.enabled = False self.log_dir: Optional[Path] = None + self._append_handles: Dict[str, TextIO] = {} if context is None or not context.enabled: return @@ -535,6 +580,7 @@ def log_final_response(self, response_data: Dict[str, Any]) -> None: response_data: The complete response data """ self._write_json("final_response.json", response_data) + self.close_append_files() def log_error(self, error_message: str) -> None: """ @@ -545,6 +591,7 @@ def log_error(self, error_message: str) -> None: """ timestamp = datetime.utcnow().isoformat() self._append_text("error.log", f"[{timestamp}] {error_message}\n") + self.close_append_files() def log_extra(self, filename: str, data: Union[Dict[str, Any], str]) -> None: """ @@ -565,6 +612,7 @@ def _write_json(self, filename: str, data: Dict[str, Any]) -> None: """Write JSON data to a file in the log directory.""" if not self.enabled or not self.log_dir: return + self._close_append_handle(filename) try: with open(self.log_dir / filename, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) @@ -576,11 +624,51 @@ def _append_text(self, filename: str, text: str) -> None: if not self.enabled or not self.log_dir: return try: - with open(self.log_dir / filename, "a", encoding="utf-8") as f: - f.write(text) + handle = self._get_append_handle(filename) + if handle: + handle.write(text) except Exception as e: lib_logger.error(f"ProviderLogger: Failed to append to {filename}: {e}") + def _get_append_handle(self, filename: str) -> Optional[TextIO]: + """Get (or lazily open) an append handle for a provider log file.""" + if not self.log_dir: + return None + handle = self._append_handles.get(filename) + if handle and not handle.closed: + return handle + handle = open(self.log_dir / filename, "a", encoding="utf-8") + self._append_handles[filename] = handle + return handle + + def _close_append_handle(self, filename: str) -> None: + """Close and forget a single cached append handle by filename.""" + handle = self._append_handles.pop(filename, None) + if not handle: + return + try: + handle.flush() + handle.close() + except Exception: + pass + + def flush_append_files(self) -> None: + """Flush cached append handles to reduce data loss risk.""" + for handle in list(self._append_handles.values()): + try: + if not handle.closed: + handle.flush() + except Exception: + pass + + def close_append_files(self) -> None: + """Flush and close all cached append handles.""" + for filename in list(self._append_handles.keys()): + self._close_append_handle(filename) + + def __del__(self) -> None: + self.close_append_files() + class AntigravityProviderLogger(ProviderLogger): """ From 14dc12225ff4977161d7bc55aaf232bac5247510 Mon Sep 17 00:00:00 2001 From: MasuRii Date: Thu, 19 Feb 2026 12:38:04 +0800 Subject: [PATCH 4/6] fix(error-handler): add RemoteProtocolError to api_connection classification Include httpx.RemoteProtocolError in the error type tuple for api_connection classification. This error occurs when a peer closes the connection without sending a complete message and should be treated as a transient connection issue rather than an unhandled exception. --- src/rotator_library/error_handler.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/rotator_library/error_handler.py b/src/rotator_library/error_handler.py index 9fd252c7..f77d5c25 100644 --- a/src/rotator_library/error_handler.py +++ b/src/rotator_library/error_handler.py @@ -935,7 +935,13 @@ def classify_error(e: Exception, provider: Optional[str] = None) -> ClassifiedEr ) if isinstance( - e, (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError) + e, + ( + httpx.TimeoutException, + httpx.ConnectError, + httpx.NetworkError, + httpx.RemoteProtocolError, # peer closed connection without complete message + ), ): # [NEW] return ClassifiedError( error_type="api_connection", original_exception=e, status_code=status_code From bf4daceeb01b2b348d5d388d702cc38d7285f80f Mon Sep 17 00:00:00 2001 From: MasuRii Date: Thu, 19 Feb 2026 12:38:25 +0800 Subject: [PATCH 5/6] fix(iflow): add connection error handling with retry logic in stream_handler Wrap stream_handler in a retry loop with exponential backoff for transient connection errors (RemoteProtocolError, ConnectError, ReadTimeout, NetworkError). Retries up to 3 times with 1s/2s/4s backoff before re-raising for higher-level error handling. Changes: - Add CONNECTION_ERROR_TYPES tuple and CONTEXT_WINDOW_ERROR_PATTERNS as module-level constants for reuse and clarity - Add MAX_CONNECTION_RETRIES and RETRY_BACKOFF_BASE configuration - Restructure stream_handler with while-loop retry around the stream context manager, re-creating the stream on each retry attempt - Add context window error detection from HTTP error response bodies to surface token limit issues explicitly - Import asyncio for async sleep during backoff --- .../providers/iflow_provider.py | 392 +++++++++++++----- 1 file changed, 282 insertions(+), 110 deletions(-) diff --git a/src/rotator_library/providers/iflow_provider.py b/src/rotator_library/providers/iflow_provider.py index 6d813412..f2945f01 100644 --- a/src/rotator_library/providers/iflow_provider.py +++ b/src/rotator_library/providers/iflow_provider.py @@ -11,6 +11,7 @@ import os import re import threading +import asyncio import httpx import logging from typing import Union, AsyncGenerator, List, Dict, Any, Optional, Tuple @@ -99,6 +100,29 @@ def _is_truthy_env(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} + +# Connection error types that should trigger retries. +CONNECTION_ERROR_TYPES = ( + httpx.RemoteProtocolError, + httpx.ConnectError, + httpx.ReadTimeout, + httpx.TimeoutException, + httpx.NetworkError, +) + +# Context window error patterns to detect in provider error bodies. +CONTEXT_WINDOW_ERROR_PATTERNS = ( + "context_length", + "token limit", + "context window", + "too many tokens", + "too long", + "max_tokens", +) + +MAX_CONNECTION_RETRIES = 3 +RETRY_BACKOFF_BASE = 1.0 + # ============================================================================= # THINKING MODE CONFIGURATION # ============================================================================= @@ -1413,6 +1437,115 @@ def _stream_to_completion_response( return litellm.ModelResponse(**final_response_data) + def _get_usage_token_count(self, usage: Any, token_key: str) -> int: + """Extract usage token count from dict/object usage payloads.""" + if usage is None: + return 0 + + if isinstance(usage, dict): + value = usage.get(token_key, 0) + else: + value = getattr(usage, token_key, 0) + + try: + return int(value or 0) + except (TypeError, ValueError): + return 0 + + def _message_to_dict(self, message: Any) -> Dict[str, Any]: + """Normalize response message object to dict.""" + if message is None: + return {} + if isinstance(message, dict): + return message + if hasattr(message, "model_dump"): + return message.model_dump(exclude_none=False) + if hasattr(message, "dict"): + return message.dict() + if hasattr(message, "__dict__"): + return {k: v for k, v in message.__dict__.items() if not k.startswith("_")} + return {} + + def _raise_silent_context_failure( + self, + *, + model: str, + reason: str, + file_logger: ProviderLogger, + ) -> None: + """Raise non-retryable context-window style error for silent 200 failures.""" + error_msg = f"iFlow silent context failure detected for {model}: {reason}" + file_logger.log_error(error_msg) + lib_logger.warning(error_msg) + + request = httpx.Request("POST", "https://iflow.invalid/chat/completions") + response = httpx.Response( + status_code=400, + request=request, + text=f"context window exceeded: {error_msg}", + ) + raise httpx.HTTPStatusError( + f"Context window exceeded: {error_msg}", + request=request, + response=response, + ) + + def _validate_final_response( + self, + *, + final_response: litellm.ModelResponse, + model: str, + file_logger: ProviderLogger, + ) -> None: + """Detect empty/invalid 200 responses that indicate silent context failures.""" + choices = getattr(final_response, "choices", None) or [] + if not choices: + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 response had no choices", + file_logger=file_logger, + ) + + usage = getattr(final_response, "usage", None) + prompt_tokens = self._get_usage_token_count(usage, "prompt_tokens") + completion_tokens = self._get_usage_token_count(usage, "completion_tokens") + if prompt_tokens > 0 and completion_tokens == 0: + self._raise_silent_context_failure( + model=model, + reason=( + "completion_tokens=0 with non-zero prompt_tokens " + f"(prompt_tokens={prompt_tokens})" + ), + file_logger=file_logger, + ) + + first_choice = choices[0] + message_obj = ( + first_choice.get("message") + if isinstance(first_choice, dict) + else getattr(first_choice, "message", None) + ) + message = self._message_to_dict(message_obj) + + content = message.get("content") + reasoning_content = message.get("reasoning_content") + tool_calls = message.get("tool_calls") + function_call = message.get("function_call") + + has_content = isinstance(content, str) and bool(content.strip()) + has_reasoning = isinstance(reasoning_content, str) and bool( + reasoning_content.strip() + ) + has_tool_calls = isinstance(tool_calls, list) and len(tool_calls) > 0 + has_function_call = function_call not in (None, {}, []) + + if not (has_content or has_reasoning or has_tool_calls or has_function_call): + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 response completed with empty assistant message", + file_logger=file_logger, + ) + async def acompletion( self, client: httpx.AsyncClient, **kwargs ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: @@ -1473,85 +1606,100 @@ async def stream_handler( include_signature: bool = True, allow_unsigned_retry: bool = True, ): - """Handles the streaming response and converts chunks.""" - # Track state across chunks for finish_reason normalization + """Handles streaming response with retries and context-failure detection.""" stream_state: Dict[str, Any] = {} - try: - async with response_stream as response: - # Check for HTTP errors before processing stream - if response.status_code >= 400: - error_text = await response.aread() - error_text = ( - error_text.decode("utf-8") - if isinstance(error_text, bytes) - else error_text - ) + saw_data_chunk = False - # Handle 401: Force token refresh and retry once - if response.status_code == 401 and attempt == 1: - lib_logger.warning( - "iFlow returned 401. Forcing token refresh and retrying once." - ) - await self._refresh_token(credential_path, force=True) - retry_stream = await make_request( - api_base, include_signature=include_signature - ) - async for chunk in stream_handler( - retry_stream, - api_base=api_base, - attempt=2, - include_signature=include_signature, - allow_unsigned_retry=allow_unsigned_retry, - ): - yield chunk - return - - # Handle 406: retry once without signature headers - elif ( - response.status_code == 406 - and include_signature - and allow_unsigned_retry - ): - lib_logger.warning( - f"iFlow returned 406 on {api_base}, retrying once without signature headers" - ) - retry_stream = await make_request( - api_base, include_signature=False + connection_retry_count = 0 + current_stream = response_stream + + while connection_retry_count <= MAX_CONNECTION_RETRIES: + try: + async with current_stream as response: + if response.status_code >= 400: + error_text = await response.aread() + error_text = ( + error_text.decode("utf-8") + if isinstance(error_text, bytes) + else error_text ) - async for chunk in stream_handler( - retry_stream, - api_base=api_base, - attempt=attempt, - include_signature=False, - allow_unsigned_retry=False, + error_text_lower = error_text.lower() + + if response.status_code == 401 and attempt == 1: + lib_logger.warning( + "iFlow returned 401. Forcing token refresh and retrying once." + ) + await self._refresh_token(credential_path, force=True) + retry_stream = await make_request( + api_base, include_signature=include_signature + ) + async for chunk in stream_handler( + retry_stream, + api_base=api_base, + attempt=2, + include_signature=include_signature, + allow_unsigned_retry=allow_unsigned_retry, + ): + yield chunk + return + + if ( + response.status_code == 406 + and include_signature + and allow_unsigned_retry ): - yield chunk - return + lib_logger.warning( + f"iFlow returned 406 on {api_base}, retrying once without signature headers" + ) + retry_stream = await make_request( + api_base, include_signature=False + ) + async for chunk in stream_handler( + retry_stream, + api_base=api_base, + attempt=attempt, + include_signature=False, + allow_unsigned_retry=False, + ): + yield chunk + return - # Handle 429: Rate limit - elif ( - response.status_code == 429 - or "slow_down" in error_text.lower() - ): - raise RateLimitError( - f"iFlow rate limit exceeded: {error_text}", - llm_provider="iflow", - model=model, - response=response, - ) + if ( + response.status_code == 429 + or "slow_down" in error_text_lower + ): + raise RateLimitError( + f"iFlow rate limit exceeded: {error_text}", + llm_provider="iflow", + model=model, + response=response, + ) - elif self._should_fallback_base(response.status_code): - raise _NextBaseError( - f"iFlow HTTP {response.status_code} on {api_base}", - cause=httpx.HTTPStatusError( - f"HTTP {response.status_code}: {error_text}", + if any( + pattern in error_text_lower + for pattern in CONTEXT_WINDOW_ERROR_PATTERNS + ): + error_msg = f"iFlow context window exceeded: {error_text}" + file_logger.log_error(error_msg) + lib_logger.warning( + f"iFlow context window error detected: {error_text}" + ) + raise httpx.HTTPStatusError( + f"Context window exceeded: {error_text}", request=response.request, response=response, - ), - ) + ) + + if self._should_fallback_base(response.status_code): + raise _NextBaseError( + f"iFlow HTTP {response.status_code} on {api_base}", + cause=httpx.HTTPStatusError( + f"HTTP {response.status_code}: {error_text}", + request=response.request, + response=response, + ), + ) - # Handle other errors - else: if not error_text: content_type = response.headers.get("content-type", "") error_text = ( @@ -1567,47 +1715,66 @@ async def stream_handler( response=response, ) - # Process successful streaming response - async for line in response.aiter_lines(): - file_logger.log_response_chunk(line) - - # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space) - if line.startswith("data:"): - # Extract data after "data:" prefix, handling both formats - if line.startswith("data: "): - data_str = line[6:] # Skip "data: " - else: - data_str = line[5:] # Skip "data:" - - if data_str.strip() == "[DONE]": - # lib_logger.debug("iFlow: Received [DONE] marker") - break - try: - chunk = json.loads(data_str) - - for openai_chunk in self._convert_chunk_to_openai( - chunk, model, stream_state - ): - yield litellm.ModelResponse(**openai_chunk) - except json.JSONDecodeError: - lib_logger.warning( - f"Could not decode JSON from iFlow: {line}" - ) + async for line in response.aiter_lines(): + file_logger.log_response_chunk(line) + + if line.startswith("data:"): + data_str = line[6:] if line.startswith("data: ") else line[5:] + + if data_str.strip() == "[DONE]": + break + + saw_data_chunk = True + try: + chunk = json.loads(data_str) + for openai_chunk in self._convert_chunk_to_openai( + chunk, model, stream_state + ): + yield litellm.ModelResponse(**openai_chunk) + except json.JSONDecodeError: + lib_logger.warning( + f"Could not decode JSON from iFlow: {line}" + ) - except httpx.HTTPStatusError: - raise # Re-raise HTTP errors we already handled - except (httpx.RequestError, httpx.TimeoutException) as e: - raise _NextBaseError( - f"Network error on iFlow base {api_base}: {e}", cause=e - ) - except _NextBaseError: - raise - except Exception as e: - file_logger.log_error(f"Error during iFlow stream processing: {e}") - lib_logger.error( - f"Error during iFlow stream processing: {e}", exc_info=True - ) - raise + if not saw_data_chunk: + self._raise_silent_context_failure( + model=model, + reason="stream completed without any data chunks", + file_logger=file_logger, + ) + return + + except httpx.HTTPStatusError: + raise + except _NextBaseError: + raise + except CONNECTION_ERROR_TYPES as e: + connection_retry_count += 1 + error_type_name = type(e).__name__ + + if connection_retry_count > MAX_CONNECTION_RETRIES: + raise _NextBaseError( + f"Network error on iFlow base {api_base}: {error_type_name}: {e}", + cause=e, + ) + + backoff = RETRY_BACKOFF_BASE * (2 ** (connection_retry_count - 1)) + lib_logger.warning( + f"iFlow connection error ({error_type_name}) on attempt " + f"{connection_retry_count}/{MAX_CONNECTION_RETRIES}: {e}. " + f"Retrying in {backoff}s..." + ) + await asyncio.sleep(backoff) + current_stream = await make_request( + api_base, + include_signature=include_signature, + ) + except Exception as e: + file_logger.log_error(f"Error during iFlow stream processing: {e}") + lib_logger.error( + f"Error during iFlow stream processing: {e}", exc_info=True + ) + raise async def logging_stream_wrapper(): """Wraps the stream to log the final reassembled response and cache reasoning.""" @@ -1642,6 +1809,11 @@ async def logging_stream_wrapper(): finally: if openai_chunks: final_response = self._stream_to_completion_response(openai_chunks) + self._validate_final_response( + final_response=final_response, + model=model, + file_logger=file_logger, + ) file_logger.log_final_response(final_response.dict()) # Store reasoning_content from the response for future multi-turn conversations From bc58e347df0430da12afbdb7477268c21e635f17 Mon Sep 17 00:00:00 2001 From: MasuRii Date: Fri, 20 Feb 2026 00:21:58 +0800 Subject: [PATCH 6/6] fix(iflow): add silent context failure detection for empty 200 responses - Detect empty choices array in HTTP 200 responses - Detect zero completion_tokens with non-zero prompt_tokens - Detect empty assistant messages (no content/reasoning/tool_calls) - Detect streams that complete without any data chunks - Raise non-retryable context_window_exceeded error for these cases - Prevents quota waste from repeated failed requests --- .../providers/iflow_provider.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/rotator_library/providers/iflow_provider.py b/src/rotator_library/providers/iflow_provider.py index f2945f01..3b276861 100644 --- a/src/rotator_library/providers/iflow_provider.py +++ b/src/rotator_library/providers/iflow_provider.py @@ -22,7 +22,10 @@ from ..timeout_config import TimeoutConfig from ..transaction_logger import ProviderLogger import litellm -from litellm.exceptions import RateLimitError, AuthenticationError +from litellm.exceptions import ( + RateLimitError, + AuthenticationError, +) from pathlib import Path import uuid from datetime import datetime @@ -1473,11 +1476,10 @@ def _raise_silent_context_failure( reason: str, file_logger: ProviderLogger, ) -> None: - """Raise non-retryable context-window style error for silent 200 failures.""" + """Raise a non-retryable context-window style error for silent 200 failures.""" error_msg = f"iFlow silent context failure detected for {model}: {reason}" file_logger.log_error(error_msg) lib_logger.warning(error_msg) - request = httpx.Request("POST", "https://iflow.invalid/chat/completions") response = httpx.Response( status_code=400, @@ -1780,6 +1782,7 @@ async def logging_stream_wrapper(): """Wraps the stream to log the final reassembled response and cache reasoning.""" openai_chunks = [] last_fallback_error: Optional[Exception] = None + stream_completed = False try: for idx, api_base in enumerate(api_bases): try: @@ -1791,6 +1794,7 @@ async def logging_stream_wrapper(): ): openai_chunks.append(chunk) yield chunk + stream_completed = True return except _NextBaseError as e: if openai_chunks: @@ -1807,7 +1811,14 @@ async def logging_stream_wrapper(): if last_fallback_error: raise last_fallback_error finally: - if openai_chunks: + if stream_completed: + if not openai_chunks: + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 stream ended without any data chunks", + file_logger=file_logger, + ) + final_response = self._stream_to_completion_response(openai_chunks) self._validate_final_response( final_response=final_response,