From f85c5c33b2db7b8c6e4de307e0c88045a680be48 Mon Sep 17 00:00:00 2001 From: jayy-77 <1427jay@gmail.com> Date: Thu, 12 Feb 2026 21:06:05 +0530 Subject: [PATCH] enhance retry logic for partition splits in execution context --- .../aio/base_execution_context.py | 16 +++-- .../aio/hybrid_search_aggregator.py | 58 +++++++++++++++++-- .../aio/multi_execution_aggregator.py | 25 +++++++- .../base_execution_context.py | 16 +++-- .../hybrid_search_aggregator.py | 58 +++++++++++++++++-- .../multi_execution_aggregator.py | 25 +++++++- 6 files changed, 174 insertions(+), 24 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py index e610da46d84f..540d8c6f19a3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py @@ -157,8 +157,11 @@ async def callback(**kwargs): # pylint: disable=unused-argument _LOGGER.debug("Partition split retry (async): Skipping 410 retry for internal PK range fetch") return await execute_fetch() - max_retries = 3 + # Increased from 3 to 5 to support back-to-back partition splits + # Heavy workloads may trigger consecutive splits on the same partition + max_retries = 5 attempt = 0 + consecutive_splits = 0 while attempt <= max_retries: try: @@ -166,20 +169,23 @@ async def callback(**kwargs): # pylint: disable=unused-argument except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): attempt += 1 + consecutive_splits += 1 + if attempt > max_retries: _LOGGER.error( - "Partition split retry (async): Exhausted all %d retries. " + "Partition split retry (async): Exhausted all %d retries after %d consecutive splits. " "state: _has_started=%s, _continuation=%s", - max_retries, self._has_started, self._continuation + max_retries, consecutive_splits, self._has_started, self._continuation ) raise # Exhausted retries, propagate error _LOGGER.warning( - "Partition split retry (async): 410 error (sub_status=%s). Attempt %d of %d. " + "Partition split retry (async): 410 error (sub_status=%s). Attempt %d of %d (consecutive splits: %d). " "Refreshing routing map and resetting state.", getattr(e, 'sub_status', 'N/A'), attempt, - max_retries + max_retries, + consecutive_splits ) # Refresh routing map to get new partition key ranges diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/hybrid_search_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/hybrid_search_aggregator.py index 0080fc3c02b8..7952a0c567ac 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/hybrid_search_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/hybrid_search_aggregator.py @@ -99,8 +99,31 @@ async def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-ma except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): # repairing document producer context on partition split - global_statistics_doc_producers = await self._repair_document_producer(global_statistics_query, - target_all_ranges=True) + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + global_statistics_doc_producers = await self._repair_document_producer(global_statistics_query, + target_all_ranges=True) + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Hybrid search aggregator (async): Partition split during global stats repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Hybrid search aggregator (async): Exhausted all %d repair retries for global stats", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise except StopAsyncIteration: @@ -145,11 +168,34 @@ async def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-ma component_query_results.append(target_query_ex_context) except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): - component_query_results = [] # repairing document producer context on partition split - for rewritten_query in rewritten_query_infos: - component_query_results.extend(await self._repair_document_producer( - rewritten_query['rewrittenQuery'])) + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + component_query_results = [] + for rewritten_query in rewritten_query_infos: + component_query_results.extend(await self._repair_document_producer( + rewritten_query['rewrittenQuery'])) + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Hybrid search aggregator (async): Partition split during component query repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Hybrid search aggregator (async): Exhausted all %d repair retries for component queries", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise except StopAsyncIteration: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py index 6fcd0117510b..22911401d849 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py @@ -190,7 +190,30 @@ async def _configure_partition_ranges(self): except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): # repairing document producer context on partition split - await self._repair_document_producer() + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + await self._repair_document_producer() + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Multi-execution aggregator (async): Partition split during repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Multi-execution aggregator (async): Exhausted all %d repair retries", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py index 8cf5a7f54043..bdcbce61c07d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py @@ -155,8 +155,11 @@ def callback(**kwargs): # pylint: disable=unused-argument _LOGGER.debug("Partition split retry: Skipping 410 retry for internal PK range fetch") return execute_fetch() - max_retries = 3 + # Increased from 3 to 5 to support back-to-back partition splits + # Heavy workloads may trigger consecutive splits on the same partition + max_retries = 5 attempt = 0 + consecutive_splits = 0 while attempt <= max_retries: try: @@ -164,20 +167,23 @@ def callback(**kwargs): # pylint: disable=unused-argument except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): attempt += 1 + consecutive_splits += 1 + if attempt > max_retries: _LOGGER.error( - "Partition split retry: Exhausted all %d retries. " + "Partition split retry: Exhausted all %d retries after %d consecutive splits. " "state: _has_started=%s, _continuation=%s", - max_retries, self._has_started, self._continuation + max_retries, consecutive_splits, self._has_started, self._continuation ) raise # Exhausted retries, propagate error _LOGGER.warning( - "Partition split retry: 410 error (sub_status=%s). Attempt %d of %d. " + "Partition split retry: 410 error (sub_status=%s). Attempt %d of %d (consecutive splits: %d). " "Refreshing routing map and resetting state.", getattr(e, 'sub_status', 'N/A'), attempt, - max_retries + max_retries, + consecutive_splits ) # Refresh routing map to get new partition key ranges diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/hybrid_search_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/hybrid_search_aggregator.py index a738dd94ddbe..a9123b0c3dce 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/hybrid_search_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/hybrid_search_aggregator.py @@ -233,8 +233,31 @@ def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-many-sta except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): # repairing document producer context on partition split - global_statistics_doc_producers = self._repair_document_producer(global_statistics_query, - target_all_ranges=True) + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + global_statistics_doc_producers = self._repair_document_producer(global_statistics_query, + target_all_ranges=True) + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Hybrid search aggregator: Partition split during global stats repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Hybrid search aggregator: Exhausted all %d repair retries for global stats", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise except StopIteration: @@ -278,11 +301,34 @@ def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-many-sta component_query_results.append(target_query_ex_context) except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): - component_query_results = [] # repairing document producer context on partition split - for rewritten_query in rewritten_query_infos: - component_query_results.extend(self._repair_document_producer( - rewritten_query['rewrittenQuery'])) + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + component_query_results = [] + for rewritten_query in rewritten_query_infos: + component_query_results.extend(self._repair_document_producer( + rewritten_query['rewrittenQuery'])) + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Hybrid search aggregator: Partition split during component query repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Hybrid search aggregator: Exhausted all %d repair retries for component queries", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise except StopIteration: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py index 6c37c71d4260..3f5709097b8b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py @@ -133,7 +133,30 @@ def _configure_partition_ranges(self): except exceptions.CosmosHttpResponseError as e: if exceptions._partition_range_is_gone(e): # repairing document producer context on partition split - self._repair_document_producer() + # Support back-to-back splits by retrying if repair itself encounters another split + repair_retries = 3 + for repair_attempt in range(repair_retries): + try: + self._repair_document_producer() + break # Success, exit retry loop + except exceptions.CosmosHttpResponseError as repair_error: + if exceptions._partition_range_is_gone(repair_error): + if repair_attempt < repair_retries - 1: + _LOGGER.warning( + "Multi-execution aggregator: Partition split during repair. " + "Retry attempt %d of %d", + repair_attempt + 1, + repair_retries + ) + continue # Retry repair + else: + _LOGGER.error( + "Multi-execution aggregator: Exhausted all %d repair retries", + repair_retries + ) + raise # Exhausted retries + else: + raise # Different error, propagate else: raise