Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,29 +157,35 @@ async def callback(**kwargs): # pylint: disable=unused-argument
_LOGGER.debug("Partition split retry (async): Skipping 410 retry for internal PK range fetch")
return await execute_fetch()

max_retries = 3
# Increased from 3 to 5 to support back-to-back partition splits
# Heavy workloads may trigger consecutive splits on the same partition
max_retries = 5
attempt = 0
consecutive_splits = 0

while attempt <= max_retries:
try:
return await execute_fetch()
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
attempt += 1
consecutive_splits += 1

if attempt > max_retries:
_LOGGER.error(
"Partition split retry (async): Exhausted all %d retries. "
"Partition split retry (async): Exhausted all %d retries after %d consecutive splits. "
"state: _has_started=%s, _continuation=%s",
max_retries, self._has_started, self._continuation
max_retries, consecutive_splits, self._has_started, self._continuation
)
raise # Exhausted retries, propagate error

_LOGGER.warning(
"Partition split retry (async): 410 error (sub_status=%s). Attempt %d of %d. "
"Partition split retry (async): 410 error (sub_status=%s). Attempt %d of %d (consecutive splits: %d). "
"Refreshing routing map and resetting state.",
getattr(e, 'sub_status', 'N/A'),
attempt,
max_retries
max_retries,
consecutive_splits
)

# Refresh routing map to get new partition key ranges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,31 @@ async def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-ma
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
global_statistics_doc_producers = await self._repair_document_producer(global_statistics_query,
target_all_ranges=True)
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
global_statistics_doc_producers = await self._repair_document_producer(global_statistics_query,
target_all_ranges=True)
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Hybrid search aggregator (async): Partition split during global stats repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Hybrid search aggregator (async): Exhausted all %d repair retries for global stats",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise
except StopAsyncIteration:
Expand Down Expand Up @@ -145,11 +168,34 @@ async def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-ma
component_query_results.append(target_query_ex_context)
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
component_query_results = []
# repairing document producer context on partition split
for rewritten_query in rewritten_query_infos:
component_query_results.extend(await self._repair_document_producer(
rewritten_query['rewrittenQuery']))
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
component_query_results = []
for rewritten_query in rewritten_query_infos:
component_query_results.extend(await self._repair_document_producer(
rewritten_query['rewrittenQuery']))
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Hybrid search aggregator (async): Partition split during component query repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Hybrid search aggregator (async): Exhausted all %d repair retries for component queries",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise
except StopAsyncIteration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,30 @@ async def _configure_partition_ranges(self):
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
await self._repair_document_producer()
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
await self._repair_document_producer()
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Multi-execution aggregator (async): Partition split during repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Multi-execution aggregator (async): Exhausted all %d repair retries",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,35 @@ def callback(**kwargs): # pylint: disable=unused-argument
_LOGGER.debug("Partition split retry: Skipping 410 retry for internal PK range fetch")
return execute_fetch()

max_retries = 3
# Increased from 3 to 5 to support back-to-back partition splits
# Heavy workloads may trigger consecutive splits on the same partition
max_retries = 5
attempt = 0
consecutive_splits = 0

while attempt <= max_retries:
try:
return execute_fetch()
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
attempt += 1
consecutive_splits += 1

if attempt > max_retries:
_LOGGER.error(
"Partition split retry: Exhausted all %d retries. "
"Partition split retry: Exhausted all %d retries after %d consecutive splits. "
"state: _has_started=%s, _continuation=%s",
max_retries, self._has_started, self._continuation
max_retries, consecutive_splits, self._has_started, self._continuation
)
raise # Exhausted retries, propagate error

_LOGGER.warning(
"Partition split retry: 410 error (sub_status=%s). Attempt %d of %d. "
"Partition split retry: 410 error (sub_status=%s). Attempt %d of %d (consecutive splits: %d). "
"Refreshing routing map and resetting state.",
getattr(e, 'sub_status', 'N/A'),
attempt,
max_retries
max_retries,
consecutive_splits
)

# Refresh routing map to get new partition key ranges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,31 @@ def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-many-sta
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
global_statistics_doc_producers = self._repair_document_producer(global_statistics_query,
target_all_ranges=True)
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
global_statistics_doc_producers = self._repair_document_producer(global_statistics_query,
target_all_ranges=True)
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Hybrid search aggregator: Partition split during global stats repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Hybrid search aggregator: Exhausted all %d repair retries for global stats",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise
except StopIteration:
Expand Down Expand Up @@ -278,11 +301,34 @@ def _run_hybrid_search(self): # pylint: disable=too-many-branches, too-many-sta
component_query_results.append(target_query_ex_context)
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
component_query_results = []
# repairing document producer context on partition split
for rewritten_query in rewritten_query_infos:
component_query_results.extend(self._repair_document_producer(
rewritten_query['rewrittenQuery']))
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
component_query_results = []
for rewritten_query in rewritten_query_infos:
component_query_results.extend(self._repair_document_producer(
rewritten_query['rewrittenQuery']))
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Hybrid search aggregator: Partition split during component query repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Hybrid search aggregator: Exhausted all %d repair retries for component queries",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise
except StopIteration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,30 @@ def _configure_partition_ranges(self):
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
self._repair_document_producer()
# Support back-to-back splits by retrying if repair itself encounters another split
repair_retries = 3
for repair_attempt in range(repair_retries):
try:
self._repair_document_producer()
break # Success, exit retry loop
except exceptions.CosmosHttpResponseError as repair_error:
if exceptions._partition_range_is_gone(repair_error):
if repair_attempt < repair_retries - 1:
_LOGGER.warning(
"Multi-execution aggregator: Partition split during repair. "
"Retry attempt %d of %d",
repair_attempt + 1,
repair_retries
)
continue # Retry repair
else:
_LOGGER.error(
"Multi-execution aggregator: Exhausted all %d repair retries",
repair_retries
)
raise # Exhausted retries
else:
raise # Different error, propagate
else:
raise

Expand Down
Loading