From 3df21fda237e220fa759c7828cab061660793931 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 12 Feb 2026 12:27:21 -0500 Subject: [PATCH 1/4] added samples for feed range apis --- sdk/cosmos/azure-cosmos/docs/FeedRanges.md | 224 ++++++++++ sdk/cosmos/azure-cosmos/samples/README.md | 7 + .../samples/feed_range_management.py | 375 +++++++++++++++++ .../samples/feed_range_management_async.py | 385 ++++++++++++++++++ 4 files changed, 991 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/docs/FeedRanges.md create mode 100644 sdk/cosmos/azure-cosmos/samples/feed_range_management.py create mode 100644 sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py diff --git a/sdk/cosmos/azure-cosmos/docs/FeedRanges.md b/sdk/cosmos/azure-cosmos/docs/FeedRanges.md new file mode 100644 index 000000000000..c0afe25ee004 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/docs/FeedRanges.md @@ -0,0 +1,224 @@ +# Feed Ranges in the Python SDK for Azure Cosmos DB + +Feed ranges represent a scope within a container, defined by a range of partition key hash values. They enable sub-container-level operations such as parallel query processing, scoped change feed consumption, and workload distribution across multiple workers. + +For general information about partitioning, see: +- [Partitioning overview](https://learn.microsoft.com/azure/cosmos-db/partitioning-overview) +- [Partition Keys in the Python SDK](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/cosmos/azure-cosmos/docs/PartitionKeys.md) + +--- + +## What Are Feed Ranges? + +A feed range is an opaque representation of a subset of data within a container. Each feed range corresponds to a contiguous range of partition key hash values. The set of all feed ranges returned by `read_feed_ranges()` covers the entire container without overlap, and every item belongs to exactly one feed range. + +> **Important:** Feed ranges are returned as `dict[str, Any]` values and must be treated as opaque. +> Do **not** manually construct, parse, or depend on the internal structure of a feed range dictionary. +> Use only the container methods described below to create, compare, and consume feed ranges. + +--- + +## When to Use Feed Ranges + +| Scenario | Description | +|----------|-------------| +| **Parallel query processing** | Read all feed ranges, then query each range independently across multiple workers. Each worker processes a non-overlapping subset of the data. | +| **Scoped change feed** | Consume the change feed for a specific feed range rather than the entire container, enabling fan-out architectures for change processing. | +| **Workload distribution** | Assign feed ranges to different threads, processes, or machines to distribute work evenly across a container's data. | +| **Session token management** | Track session tokens per feed range for fine-grained session consistency when managing your own session tokens across multiple clients. | +| **Partition key to feed range mapping** | Convert partition key values to feed ranges to determine which worker or scope a specific partition key belongs to. | + +--- + +## API Reference + +### `read_feed_ranges()` + +Returns all feed ranges for the container. The number of feed ranges corresponds to the number of physical partitions. + +```python +# Sync +feed_ranges = list(container.read_feed_ranges()) + +# Async +feed_ranges = [fr async for fr in container.read_feed_ranges()] +``` + +**Parameters:** +- `force_refresh` *(bool, optional)* – When `True`, refreshes the cached partition key ranges before returning. Use this after a known partition split. Default: `False`. + +**Returns:** `Iterable[dict[str, Any]]` (sync) / `AsyncIterable[dict[str, Any]]` (async) + +--- + +### `feed_range_from_partition_key()` + +Converts a partition key value to its corresponding feed range. Useful for determining which feed range a specific partition key belongs to. + +```python +# Sync +feed_range = container.feed_range_from_partition_key("Seattle") + +# Async +feed_range = await container.feed_range_from_partition_key("Seattle") +``` + +**Parameters:** +- `partition_key` *(PartitionKeyType)* – The partition key value. If set to `None`, returns the feed range for partition keys with JSON null. See [Partition Keys](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/cosmos/azure-cosmos/docs/PartitionKeys.md) for supported value types. + +**Returns:** `dict[str, Any]` + +--- + +### `is_feed_range_subset()` + +Checks whether one feed range is fully contained within another. + +```python +# Sync +is_subset = container.is_feed_range_subset( + parent_feed_range=container_feed_range, + child_feed_range=pk_feed_range +) + +# Async +is_subset = await container.is_feed_range_subset( + parent_feed_range=container_feed_range, + child_feed_range=pk_feed_range +) +``` + +**Parameters:** +- `parent_feed_range` *(dict[str, Any])* – The feed range to test as the superset. +- `child_feed_range` *(dict[str, Any])* – The feed range to test as the subset. + +**Returns:** `bool` – `True` if the child feed range is fully contained within the parent. + +--- + +### `query_items()` with `feed_range` + +Scopes a SQL query to a specific feed range. This is useful for parallel query execution where each worker queries a different feed range. + +```python +# Sync +items = list(container.query_items( + query="SELECT * FROM c WHERE c.status = 'active'", + feed_range=feed_range +)) + +# Async +items = [item async for item in container.query_items( + query="SELECT * FROM c WHERE c.status = 'active'", + feed_range=feed_range +)] +``` + +> **Note:** `feed_range` and `partition_key` are **mutually exclusive** parameters. +> Providing both will raise a `ValueError`. + +--- + +### `query_items_change_feed()` with `feed_range` + +Scopes change feed consumption to a specific feed range. This enables fan-out architectures where each worker processes changes for a non-overlapping subset of the container. + +```python +# Sync +response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" +) +for item in response: + process(item) + +# Async +response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" +) +async for item in response: + process(item) +``` + +> **Note:** `feed_range`, `partition_key`, and `partition_key_range_id` are **mutually exclusive** parameters. + +--- + +### `get_latest_session_token()` + +Gets the most up-to-date session token for a specific feed range from a list of session token and feed range pairs. This is a **provisional** API intended for advanced session token management scenarios. + +```python +# Sync +session_token = container.get_latest_session_token( + feed_ranges_to_session_tokens=[(feed_range, token), ...], + target_feed_range=target_feed_range +) +``` + +See the [session_token_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/session_token_management.py) sample for a complete example. + +--- + +## Feed Ranges vs. Partition Keys + +Feed ranges and partition keys both define a scope within a container, but they operate at different levels: + +| | Partition Key | Feed Range | +|---|---|---| +| **Granularity** | A single logical partition (one partition key value) | Zero, one, or many logical partitions (a hash range) | +| **Source** | Defined by your data model; you supply the value | Returned by `read_feed_ranges()` or derived from a partition key | +| **Relationship** | A partition key maps to exactly one feed range | A feed range can contain multiple partition keys | +| **Use case** | Point reads, single-partition queries | Parallel processing, scoped change feed, workload distribution | + +You can convert between them: +```python +# Partition key → Feed range +feed_range = container.feed_range_from_partition_key("my_partition_key") + +# Check which container feed range contains a partition key's feed range +for container_fr in container.read_feed_ranges(): + if container.is_feed_range_subset(container_fr, feed_range): + print("Partition key belongs to this feed range") + break +``` + +--- + +## Important Considerations + +### Feed Ranges Are Opaque +Feed ranges are returned as `dict[str, Any]`. Their internal structure may change between SDK versions. Always use the provided API methods rather than constructing or parsing feed range dictionaries. + +### Serialization for Storage +If you need to persist feed ranges (e.g., to assign ranges to workers across restarts), you can safely serialize them with `json.dumps()` and deserialize with `json.loads()`: + +```python +import json + +# Serialize for storage +serialized = json.dumps(feed_range) + +# Deserialize for later use +feed_range = json.loads(serialized) +``` + +### Partition Splits and Stale Feed Ranges +As your container's data grows, physical partitions may split, changing the set of feed ranges. The SDK handles stale feed ranges gracefully: + +- **Queries with `feed_range`**: The SDK automatically resolves stale feed ranges and routes the query correctly. +- **Change feed with `feed_range`**: The SDK detects "feed range gone" conditions and transparently handles the split. +- **To get updated feed ranges**: Call `container.read_feed_ranges(force_refresh=True)`. + +### Mutual Exclusivity +When using feed ranges with `query_items()` or `query_items_change_feed()`, the `feed_range` parameter is mutually exclusive with `partition_key` (and `partition_key_range_id` for change feed). Providing both will raise a `ValueError`. + +--- + +## Samples + +For complete, runnable examples, see: +- [feed_range_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/feed_range_management.py) – Sync sample covering all feed range operations +- [feed_range_management_async.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py) – Async sample with `asyncio.gather()` for parallel processing +- [session_token_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/session_token_management.py) – Session token management using feed ranges diff --git a/sdk/cosmos/azure-cosmos/samples/README.md b/sdk/cosmos/azure-cosmos/samples/README.md index 12168bb85cb1..fa6cca837c98 100644 --- a/sdk/cosmos/azure-cosmos/samples/README.md +++ b/sdk/cosmos/azure-cosmos/samples/README.md @@ -39,6 +39,13 @@ The following are code samples that show common scenario operations with the Azu * [change_feed_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/change_feed_management.py) - Example demontrating how to consume the Change Feed and iterate on the results. +* [feed_range_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/feed_range_management.py) - Example demonstrating feed range operations including: + * Reading feed ranges from a container + * Getting a feed range for a specific partition key + * Checking feed range subset relationships + * Querying items scoped to a feed range + * Consuming change feed scoped to a feed range + * Parallel change feed processing using feed ranges * [access_cosmos_with_resource_token.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/access_cosmos_with_resource_token.py) - Example demontrating how to get and use resource token that allows restricted access to data. diff --git a/sdk/cosmos/azure-cosmos/samples/feed_range_management.py b/sdk/cosmos/azure-cosmos/samples/feed_range_management.py new file mode 100644 index 000000000000..78a568645778 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/feed_range_management.py @@ -0,0 +1,375 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import json +import uuid + +import azure.cosmos.cosmos_client as cosmos_client +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates how to work with feed ranges in Azure Cosmos DB. +# +# Feed ranges represent a scope within a container, defined by a range of partition key hash values. +# They enable sub-container-level operations such as parallel query processing, scoped change feed +# consumption, and workload partitioning across multiple workers. +# +# Feed ranges are returned as opaque dict[str, Any] values and should not be manually constructed +# or parsed. Use the provided container methods to create and compare them. +# +# 1. Reading feed ranges from a container +# 2. Getting a feed range for a specific partition key +# 3. Checking if one feed range is a subset of another +# 4. Querying items scoped to a feed range +# 5. Querying items using a feed range derived from a partition key +# 6. Consuming change feed scoped to a feed range +# 7. Parallel change feed processing using feed ranges +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) a Database and Container on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Partition key values used throughout the sample +PARTITION_KEY_VALUES = ['Seattle', 'Portland', 'Denver', 'Austin', 'Chicago'] + + +def create_sample_items(container): + """Create sample items across multiple partition keys.""" + print('\nCreating sample items across partition keys: {}'.format(PARTITION_KEY_VALUES)) + items = [] + for city in PARTITION_KEY_VALUES: + for i in range(3): + item = { + 'id': 'item-{}-{}'.format(city, str(uuid.uuid4())[:8]), + 'city': city, + 'name': 'Sample Item {} from {}'.format(i + 1, city), + 'value': i * 10 + } + container.create_item(body=item) + items.append(item) + print('Created {} items'.format(len(items))) + return items + + +def read_feed_ranges(container): + """Demonstrates reading all feed ranges from a container. + + Feed ranges represent the partitioning of your container's data. The number of feed ranges + corresponds to the number of physical partitions backing your container. As your data grows + and partitions split, the number of feed ranges may increase. + """ + print('\n--- 1. Reading feed ranges from the container ---\n') + + # read_feed_ranges() returns an iterable of feed range dicts. + # Each feed range represents a scope within the container. + feed_ranges = list(container.read_feed_ranges()) + + print('Container has {} feed range(s):'.format(len(feed_ranges))) + for i, feed_range in enumerate(feed_ranges): + # Feed ranges are opaque dict values. You can serialize them with json.dumps() for + # storage or logging, but should not parse or construct them manually. + print(' Feed range {}: {}'.format(i + 1, json.dumps(feed_range))) + + # You can force a refresh of the cached partition key ranges if needed + # (e.g., after a partition split): + refreshed_feed_ranges = list(container.read_feed_ranges(force_refresh=True)) + print('\nAfter force refresh: {} feed range(s)'.format(len(refreshed_feed_ranges))) + + return feed_ranges + + +def feed_range_from_partition_key(container): + """Demonstrates getting a feed range for a specific partition key value. + + This is useful when you need a feed range representation of a single partition key, + for example to use with is_feed_range_subset() or to scope a change feed query. + """ + print('\n--- 2. Getting feed ranges from partition key values ---\n') + + feed_ranges_by_pk = {} + for city in PARTITION_KEY_VALUES: + # Convert a partition key value to its corresponding feed range + feed_range = container.feed_range_from_partition_key(city) + feed_ranges_by_pk[city] = feed_range + print('Feed range for partition key "{}": {}'.format(city, json.dumps(feed_range))) + + # You can also get a feed range for a None partition key (JSON null) + null_feed_range = container.feed_range_from_partition_key(None) + print('\nFeed range for None partition key: {}'.format(json.dumps(null_feed_range))) + + return feed_ranges_by_pk + + +def check_feed_range_subset(container, feed_ranges_by_pk): + """Demonstrates checking if one feed range is a subset of another. + + This is useful for determining which container-level feed range contains a specific + partition key's feed range, enabling scenarios like routing operations to the correct + worker in a fan-out architecture. + """ + print('\n--- 3. Checking feed range subset relationships ---\n') + + # Read all container-level feed ranges (these cover the full container) + container_feed_ranges = list(container.read_feed_ranges()) + print('Container has {} feed range(s)'.format(len(container_feed_ranges))) + + # For each partition key, find which container feed range contains it + for city, pk_feed_range in feed_ranges_by_pk.items(): + for i, container_fr in enumerate(container_feed_ranges): + # is_feed_range_subset checks if 'child' is fully contained within 'parent' + is_subset = container.is_feed_range_subset( + parent_feed_range=container_fr, + child_feed_range=pk_feed_range + ) + if is_subset: + print('Partition key "{}" belongs to container feed range {}'.format(city, i + 1)) + break + + # Verify that each container feed range is a subset of itself + print('\nEach container feed range is a subset of itself:') + for i, fr in enumerate(container_feed_ranges): + assert container.is_feed_range_subset(fr, fr), "A feed range should be a subset of itself" + print(' Feed range {} is a subset of itself: True'.format(i + 1)) + + +def query_items_with_feed_range(container): + """Demonstrates querying items scoped to individual feed ranges. + + By reading all feed ranges and querying each one separately, you can parallelize + query execution across multiple workers. Each worker processes a distinct subset + of the container's data with no overlap. + + Note: feed_range and partition_key are mutually exclusive parameters in query_items(). + """ + print('\n--- 4. Querying items scoped to feed ranges ---\n') + + feed_ranges = list(container.read_feed_ranges()) + all_items = [] + + for i, feed_range in enumerate(feed_ranges): + print('Querying items in feed range {}...'.format(i + 1)) + + # Use the feed_range keyword to scope the query to a specific feed range. + # This replaces the need for enable_cross_partition_query when you want to + # process data in parallel across feed ranges. + items = list(container.query_items( + query="SELECT c.id, c.city, c.name FROM c", + feed_range=feed_range + )) + + print(' Found {} items'.format(len(items))) + for item in items: + print(' - {} (city: {})'.format(item['id'], item['city'])) + + all_items.extend(items) + + print('\nTotal items across all feed ranges: {}'.format(len(all_items))) + print('(This should equal the total number of items in the container)') + + +def query_items_with_feed_range_from_pk(container): + """Demonstrates querying items using a feed range derived from a partition key. + + You can convert a partition key to a feed range and use it with query_items(). + This is functionally equivalent to using the partition_key parameter, but gives + you a feed range that can also be used with is_feed_range_subset() or stored + for later use. + """ + print('\n--- 5. Querying items with a feed range from a partition key ---\n') + + target_city = 'Seattle' + + # Get the feed range for a specific partition key + feed_range = container.feed_range_from_partition_key(target_city) + print('Feed range for "{}": {}'.format(target_city, json.dumps(feed_range))) + + # Query using feed_range - returns items from that partition key's scope + items_via_feed_range = list(container.query_items( + query="SELECT c.id, c.city FROM c", + feed_range=feed_range + )) + + # Compare with query using partition_key directly + items_via_partition_key = list(container.query_items( + query="SELECT c.id, c.city FROM c", + partition_key=target_city + )) + + print('Items found via feed_range: {}'.format(len(items_via_feed_range))) + print('Items found via partition_key: {}'.format(len(items_via_partition_key))) + print('Results match: {}'.format( + sorted([i['id'] for i in items_via_feed_range]) == + sorted([i['id'] for i in items_via_partition_key]) + )) + + # Note: Using both feed_range and partition_key together will raise a ValueError + print('\nNote: feed_range and partition_key are mutually exclusive.') + try: + list(container.query_items( + query="SELECT * FROM c", + feed_range=feed_range, + partition_key=target_city + )) + except ValueError as e: + print('Expected error when using both: {}'.format(e)) + + +def change_feed_with_feed_range(container): + """Demonstrates consuming the change feed scoped to a specific feed range. + + By using feed_range with query_items_change_feed(), you can process changes for a + subset of your container's data. This is useful when you want to process changes + for a specific partition or range of partitions without consuming the entire change feed. + + Note: feed_range, partition_key, and partition_key_range_id are mutually exclusive + parameters in query_items_change_feed(). + """ + print('\n--- 6. Change feed scoped to a feed range ---\n') + + # Get a feed range for a specific partition key + target_city = 'Portland' + feed_range = container.feed_range_from_partition_key(target_city) + print('Consuming change feed for partition key "{}"...'.format(target_city)) + + # Read change feed from the beginning, scoped to this feed range + response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" + ) + + change_count = 0 + for item in response: + change_count += 1 + if change_count <= 5: # Print first 5 for brevity + print(' Changed item: {} (city: {})'.format(item.get('id', 'N/A'), item.get('city', 'N/A'))) + + if change_count > 5: + print(' ... and {} more items'.format(change_count - 5)) + print('Total changes in feed range: {}'.format(change_count)) + + +def parallel_change_feed_processing(container): + """Demonstrates the pattern for parallel change feed processing using feed ranges. + + This is one of the most powerful use cases for feed ranges: distributing change feed + processing across multiple workers. Each worker is assigned one or more feed ranges + and processes changes independently, with no overlap between workers. + + In this sample, we simulate the parallel pattern synchronously. In a real application, + each feed range would be processed by a separate thread, process, or machine. + """ + print('\n--- 7. Parallel change feed processing with feed ranges ---\n') + + # Step 1: Read all feed ranges for the container + feed_ranges = list(container.read_feed_ranges()) + print('Container has {} feed range(s) to distribute across workers'.format(len(feed_ranges))) + + # Step 2: Each "worker" processes changes for its assigned feed range + total_changes = 0 + for worker_id, feed_range in enumerate(feed_ranges): + print('\n[Worker {}] Processing change feed for feed range: {}'.format( + worker_id, json.dumps(feed_range)[:80] + '...' + )) + + # Each worker reads the change feed for its assigned feed range + response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" + ) + + worker_changes = 0 + partition_keys_seen = set() + for item in response: + worker_changes += 1 + partition_keys_seen.add(item.get('city', 'unknown')) + + print('[Worker {}] Processed {} changes covering partition keys: {}'.format( + worker_id, worker_changes, partition_keys_seen + )) + total_changes += worker_changes + + print('\nTotal changes across all workers: {}'.format(total_changes)) + print('Each item was processed by exactly one worker (no duplicates, no gaps)') + + +def run_sample(): + client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) + + try: + # Setup database and container + try: + db = client.create_database(id=DATABASE_ID) + except exceptions.CosmosResourceExistsError: + db = client.get_database_client(DATABASE_ID) + + try: + container = db.create_container( + id=CONTAINER_ID, + partition_key=PartitionKey(path='/city'), + offer_throughput=400 + ) + print('Container with id \'{0}\' created'.format(CONTAINER_ID)) + except exceptions.CosmosResourceExistsError: + container = db.get_container_client(CONTAINER_ID) + print('Container with id \'{0}\' already exists'.format(CONTAINER_ID)) + + # Create sample data + create_sample_items(container) + + # 1. Read feed ranges from the container + feed_ranges = read_feed_ranges(container) + + # 2. Get feed ranges from partition key values + feed_ranges_by_pk = feed_range_from_partition_key(container) + + # 3. Check feed range subset relationships + check_feed_range_subset(container, feed_ranges_by_pk) + + # 4. Query items scoped to feed ranges + query_items_with_feed_range(container) + + # 5. Query items using a feed range derived from a partition key + query_items_with_feed_range_from_pk(container) + + # 6. Read change feed scoped to a feed range + change_feed_with_feed_range(container) + + # 7. Parallel change feed processing using feed ranges + parallel_change_feed_processing(container) + + # Cleanup + try: + client.delete_database(db) + except exceptions.CosmosResourceNotFoundError: + pass + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + + +if __name__ == '__main__': + run_sample() diff --git a/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py b/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py new file mode 100644 index 000000000000..58d85d08aecb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py @@ -0,0 +1,385 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import asyncio +import json +import uuid + +from azure.cosmos.aio import CosmosClient +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates how to work with feed ranges in Azure Cosmos DB using the async client. +# +# Feed ranges represent a scope within a container, defined by a range of partition key hash values. +# They enable sub-container-level operations such as parallel query processing, scoped change feed +# consumption, and workload partitioning across multiple workers. +# +# Feed ranges are returned as opaque dict[str, Any] values and should not be manually constructed +# or parsed. Use the provided container methods to create and compare them. +# +# 1. Reading feed ranges from a container +# 2. Getting a feed range for a specific partition key +# 3. Checking if one feed range is a subset of another +# 4. Querying items scoped to a feed range +# 5. Querying items using a feed range derived from a partition key +# 6. Consuming change feed scoped to a feed range +# 7. Parallel change feed processing using feed ranges with asyncio.gather +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) a Database and Container on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Partition key values used throughout the sample +PARTITION_KEY_VALUES = ['Seattle', 'Portland', 'Denver', 'Austin', 'Chicago'] + + +async def create_sample_items(container): + """Create sample items across multiple partition keys.""" + print('\nCreating sample items across partition keys: {}'.format(PARTITION_KEY_VALUES)) + items = [] + for city in PARTITION_KEY_VALUES: + for i in range(3): + item = { + 'id': 'item-{}-{}'.format(city, str(uuid.uuid4())[:8]), + 'city': city, + 'name': 'Sample Item {} from {}'.format(i + 1, city), + 'value': i * 10 + } + await container.create_item(body=item) + items.append(item) + print('Created {} items'.format(len(items))) + return items + + +async def read_feed_ranges(container): + """Demonstrates reading all feed ranges from a container. + + Feed ranges represent the partitioning of your container's data. The number of feed ranges + corresponds to the number of physical partitions backing your container. As your data grows + and partitions split, the number of feed ranges may increase. + """ + print('\n--- 1. Reading feed ranges from the container ---\n') + + # read_feed_ranges() returns an async iterable of feed range dicts. + # Each feed range represents a scope within the container. + feed_ranges = [fr async for fr in container.read_feed_ranges()] + + print('Container has {} feed range(s):'.format(len(feed_ranges))) + for i, feed_range in enumerate(feed_ranges): + # Feed ranges are opaque dict values. You can serialize them with json.dumps() for + # storage or logging, but should not parse or construct them manually. + print(' Feed range {}: {}'.format(i + 1, json.dumps(feed_range))) + + # You can force a refresh of the cached partition key ranges if needed + # (e.g., after a partition split): + refreshed_feed_ranges = [fr async for fr in container.read_feed_ranges(force_refresh=True)] + print('\nAfter force refresh: {} feed range(s)'.format(len(refreshed_feed_ranges))) + + return feed_ranges + + +async def feed_range_from_partition_key(container): + """Demonstrates getting a feed range for a specific partition key value. + + This is useful when you need a feed range representation of a single partition key, + for example to use with is_feed_range_subset() or to scope a change feed query. + + Note: In the async client, feed_range_from_partition_key() is a coroutine and must be awaited. + """ + print('\n--- 2. Getting feed ranges from partition key values ---\n') + + feed_ranges_by_pk = {} + for city in PARTITION_KEY_VALUES: + # Convert a partition key value to its corresponding feed range (await required for async) + feed_range = await container.feed_range_from_partition_key(city) + feed_ranges_by_pk[city] = feed_range + print('Feed range for partition key "{}": {}'.format(city, json.dumps(feed_range))) + + # You can also get a feed range for a None partition key (JSON null) + null_feed_range = await container.feed_range_from_partition_key(None) + print('\nFeed range for None partition key: {}'.format(json.dumps(null_feed_range))) + + return feed_ranges_by_pk + + +async def check_feed_range_subset(container, feed_ranges_by_pk): + """Demonstrates checking if one feed range is a subset of another. + + This is useful for determining which container-level feed range contains a specific + partition key's feed range, enabling scenarios like routing operations to the correct + worker in a fan-out architecture. + + Note: In the async client, is_feed_range_subset() is a coroutine and must be awaited. + """ + print('\n--- 3. Checking feed range subset relationships ---\n') + + # Read all container-level feed ranges (these cover the full container) + container_feed_ranges = [fr async for fr in container.read_feed_ranges()] + print('Container has {} feed range(s)'.format(len(container_feed_ranges))) + + # For each partition key, find which container feed range contains it + for city, pk_feed_range in feed_ranges_by_pk.items(): + for i, container_fr in enumerate(container_feed_ranges): + # is_feed_range_subset checks if 'child' is fully contained within 'parent' + is_subset = await container.is_feed_range_subset( + parent_feed_range=container_fr, + child_feed_range=pk_feed_range + ) + if is_subset: + print('Partition key "{}" belongs to container feed range {}'.format(city, i + 1)) + break + + # Verify that each container feed range is a subset of itself + print('\nEach container feed range is a subset of itself:') + for i, fr in enumerate(container_feed_ranges): + assert await container.is_feed_range_subset(fr, fr), "A feed range should be a subset of itself" + print(' Feed range {} is a subset of itself: True'.format(i + 1)) + + +async def query_items_with_feed_range(container): + """Demonstrates querying items scoped to individual feed ranges. + + By reading all feed ranges and querying each one separately, you can parallelize + query execution across multiple workers. Each worker processes a distinct subset + of the container's data with no overlap. + + Note: feed_range and partition_key are mutually exclusive parameters in query_items(). + """ + print('\n--- 4. Querying items scoped to feed ranges ---\n') + + feed_ranges = [fr async for fr in container.read_feed_ranges()] + all_items = [] + + for i, feed_range in enumerate(feed_ranges): + print('Querying items in feed range {}...'.format(i + 1)) + + # Use the feed_range keyword to scope the query to a specific feed range. + # This replaces the need for enable_cross_partition_query when you want to + # process data in parallel across feed ranges. + items = [item async for item in container.query_items( + query="SELECT c.id, c.city, c.name FROM c", + feed_range=feed_range + )] + + print(' Found {} items'.format(len(items))) + for item in items: + print(' - {} (city: {})'.format(item['id'], item['city'])) + + all_items.extend(items) + + print('\nTotal items across all feed ranges: {}'.format(len(all_items))) + print('(This should equal the total number of items in the container)') + + +async def query_items_with_feed_range_from_pk(container): + """Demonstrates querying items using a feed range derived from a partition key. + + You can convert a partition key to a feed range and use it with query_items(). + This is functionally equivalent to using the partition_key parameter, but gives + you a feed range that can also be used with is_feed_range_subset() or stored + for later use. + """ + print('\n--- 5. Querying items with a feed range from a partition key ---\n') + + target_city = 'Seattle' + + # Get the feed range for a specific partition key + feed_range = await container.feed_range_from_partition_key(target_city) + print('Feed range for "{}": {}'.format(target_city, json.dumps(feed_range))) + + # Query using feed_range - returns items from that partition key's scope + items_via_feed_range = [item async for item in container.query_items( + query="SELECT c.id, c.city FROM c", + feed_range=feed_range + )] + + # Compare with query using partition_key directly + items_via_partition_key = [item async for item in container.query_items( + query="SELECT c.id, c.city FROM c", + partition_key=target_city + )] + + print('Items found via feed_range: {}'.format(len(items_via_feed_range))) + print('Items found via partition_key: {}'.format(len(items_via_partition_key))) + print('Results match: {}'.format( + sorted([i['id'] for i in items_via_feed_range]) == + sorted([i['id'] for i in items_via_partition_key]) + )) + + # Note: Using both feed_range and partition_key together will raise a ValueError + print('\nNote: feed_range and partition_key are mutually exclusive.') + try: + async for _ in container.query_items( + query="SELECT * FROM c", + feed_range=feed_range, + partition_key=target_city + ): + pass + except ValueError as e: + print('Expected error when using both: {}'.format(e)) + + +async def change_feed_with_feed_range(container): + """Demonstrates consuming the change feed scoped to a specific feed range. + + By using feed_range with query_items_change_feed(), you can process changes for a + subset of your container's data. This is useful when you want to process changes + for a specific partition or range of partitions without consuming the entire change feed. + + Note: feed_range, partition_key, and partition_key_range_id are mutually exclusive + parameters in query_items_change_feed(). + """ + print('\n--- 6. Change feed scoped to a feed range ---\n') + + # Get a feed range for a specific partition key + target_city = 'Portland' + feed_range = await container.feed_range_from_partition_key(target_city) + print('Consuming change feed for partition key "{}"...'.format(target_city)) + + # Read change feed from the beginning, scoped to this feed range + response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" + ) + + change_count = 0 + async for item in response: + change_count += 1 + if change_count <= 5: # Print first 5 for brevity + print(' Changed item: {} (city: {})'.format(item.get('id', 'N/A'), item.get('city', 'N/A'))) + + if change_count > 5: + print(' ... and {} more items'.format(change_count - 5)) + print('Total changes in feed range: {}'.format(change_count)) + + +async def _process_worker(worker_id, container, feed_range): + """Process change feed for a single feed range (simulates one worker).""" + response = container.query_items_change_feed( + feed_range=feed_range, + start_time="Beginning" + ) + + worker_changes = 0 + partition_keys_seen = set() + async for item in response: + worker_changes += 1 + partition_keys_seen.add(item.get('city', 'unknown')) + + print('[Worker {}] Processed {} changes covering partition keys: {}'.format( + worker_id, worker_changes, partition_keys_seen + )) + return worker_changes + + +async def parallel_change_feed_processing(container): + """Demonstrates parallel change feed processing using feed ranges with asyncio.gather. + + This is one of the most powerful use cases for feed ranges: distributing change feed + processing across multiple workers. Each worker is assigned one or more feed ranges + and processes changes independently, with no overlap between workers. + + The async client enables true concurrent processing using asyncio.gather(), + allowing multiple feed ranges to be processed simultaneously. + """ + print('\n--- 7. Parallel change feed processing with feed ranges ---\n') + + # Step 1: Read all feed ranges for the container + feed_ranges = [fr async for fr in container.read_feed_ranges()] + print('Container has {} feed range(s) to distribute across workers'.format(len(feed_ranges))) + + # Step 2: Launch concurrent workers with asyncio.gather + # Each worker processes changes for its assigned feed range concurrently + print('\nLaunching {} concurrent workers...\n'.format(len(feed_ranges))) + tasks = [ + _process_worker(worker_id, container, feed_range) + for worker_id, feed_range in enumerate(feed_ranges) + ] + results = await asyncio.gather(*tasks) + + total_changes = sum(results) + print('\nTotal changes across all workers: {}'.format(total_changes)) + print('Each item was processed by exactly one worker (no duplicates, no gaps)') + + +async def run_sample(): + async with CosmosClient(HOST, {'masterKey': MASTER_KEY}) as client: + try: + # Setup database and container + try: + db = await client.create_database(id=DATABASE_ID) + except exceptions.CosmosResourceExistsError: + db = client.get_database_client(DATABASE_ID) + + try: + container = await db.create_container( + id=CONTAINER_ID, + partition_key=PartitionKey(path='/city'), + offer_throughput=400 + ) + print('Container with id \'{0}\' created'.format(CONTAINER_ID)) + except exceptions.CosmosResourceExistsError: + container = db.get_container_client(CONTAINER_ID) + print('Container with id \'{0}\' already exists'.format(CONTAINER_ID)) + + # Create sample data + await create_sample_items(container) + + # 1. Read feed ranges from the container + feed_ranges = await read_feed_ranges(container) + + # 2. Get feed ranges from partition key values + feed_ranges_by_pk = await feed_range_from_partition_key(container) + + # 3. Check feed range subset relationships + await check_feed_range_subset(container, feed_ranges_by_pk) + + # 4. Query items scoped to feed ranges + await query_items_with_feed_range(container) + + # 5. Query items using a feed range derived from a partition key + await query_items_with_feed_range_from_pk(container) + + # 6. Read change feed scoped to a feed range + await change_feed_with_feed_range(container) + + # 7. Parallel change feed processing using feed ranges + await parallel_change_feed_processing(container) + + # Cleanup + try: + await client.delete_database(db) + except exceptions.CosmosResourceNotFoundError: + pass + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + + +if __name__ == '__main__': + asyncio.run(run_sample()) From cb7a44076eae2029724f7c515637ab2675b3674f Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 12 Feb 2026 14:48:05 -0500 Subject: [PATCH 2/4] updated sample delete --- .../samples/feed_range_management.py | 47 +++++++++--------- .../samples/feed_range_management_async.py | 48 +++++++++---------- 2 files changed, 47 insertions(+), 48 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/samples/feed_range_management.py b/sdk/cosmos/azure-cosmos/samples/feed_range_management.py index 78a568645778..ee435bc45b78 100644 --- a/sdk/cosmos/azure-cosmos/samples/feed_range_management.py +++ b/sdk/cosmos/azure-cosmos/samples/feed_range_management.py @@ -12,6 +12,9 @@ import config +# Use unique suffixes so the sample never collides with pre-existing resources +_SUFFIX = str(uuid.uuid4())[:8] + # ---------------------------------------------------------------------------------------------------------- # Prerequisites - # @@ -47,8 +50,8 @@ HOST = config.settings['host'] MASTER_KEY = config.settings['master_key'] -DATABASE_ID = config.settings['database_id'] -CONTAINER_ID = config.settings['container_id'] +DATABASE_ID = config.settings['database_id'] + '-feed-range-sample-' + _SUFFIX +CONTAINER_ID = config.settings['container_id'] + '-feed-range-sample-' + _SUFFIX # Partition key values used throughout the sample PARTITION_KEY_VALUES = ['Seattle', 'Portland', 'Denver', 'Austin', 'Chicago'] @@ -315,24 +318,19 @@ def parallel_change_feed_processing(container): def run_sample(): client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) + db = None try: - # Setup database and container - try: - db = client.create_database(id=DATABASE_ID) - except exceptions.CosmosResourceExistsError: - db = client.get_database_client(DATABASE_ID) - - try: - container = db.create_container( - id=CONTAINER_ID, - partition_key=PartitionKey(path='/city'), - offer_throughput=400 - ) - print('Container with id \'{0}\' created'.format(CONTAINER_ID)) - except exceptions.CosmosResourceExistsError: - container = db.get_container_client(CONTAINER_ID) - print('Container with id \'{0}\' already exists'.format(CONTAINER_ID)) + # Setup database and container (unique IDs so we never touch pre-existing resources) + db = client.create_database(id=DATABASE_ID) + print('Database with id \'{0}\' created'.format(DATABASE_ID)) + + container = db.create_container( + id=CONTAINER_ID, + partition_key=PartitionKey(path='/city'), + offer_throughput=400 + ) + print('Container with id \'{0}\' created'.format(CONTAINER_ID)) # Create sample data create_sample_items(container) @@ -358,16 +356,17 @@ def run_sample(): # 7. Parallel change feed processing using feed ranges parallel_change_feed_processing(container) - # Cleanup - try: - client.delete_database(db) - except exceptions.CosmosResourceNotFoundError: - pass - except exceptions.CosmosHttpResponseError as e: print('\nrun_sample has caught an error. {0}'.format(e.message)) finally: + # Clean up the sample database if it was created by this run + if db is not None: + try: + client.delete_database(db) + print('\nSample database \'{0}\' deleted'.format(DATABASE_ID)) + except exceptions.CosmosResourceNotFoundError: + pass print("\nrun_sample done") diff --git a/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py b/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py index 58d85d08aecb..4248a6588a69 100644 --- a/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py +++ b/sdk/cosmos/azure-cosmos/samples/feed_range_management_async.py @@ -13,6 +13,9 @@ import config +# Use unique suffixes so the sample never collides with pre-existing resources +_SUFFIX = str(uuid.uuid4())[:8] + # ---------------------------------------------------------------------------------------------------------- # Prerequisites - # @@ -48,8 +51,8 @@ HOST = config.settings['host'] MASTER_KEY = config.settings['master_key'] -DATABASE_ID = config.settings['database_id'] -CONTAINER_ID = config.settings['container_id'] +DATABASE_ID = config.settings['database_id'] + '-feed-range-sample-' + _SUFFIX +CONTAINER_ID = config.settings['container_id'] + '-feed-range-sample-' + _SUFFIX # Partition key values used throughout the sample PARTITION_KEY_VALUES = ['Seattle', 'Portland', 'Denver', 'Austin', 'Chicago'] @@ -326,23 +329,19 @@ async def parallel_change_feed_processing(container): async def run_sample(): async with CosmosClient(HOST, {'masterKey': MASTER_KEY}) as client: + db = None + try: - # Setup database and container - try: - db = await client.create_database(id=DATABASE_ID) - except exceptions.CosmosResourceExistsError: - db = client.get_database_client(DATABASE_ID) - - try: - container = await db.create_container( - id=CONTAINER_ID, - partition_key=PartitionKey(path='/city'), - offer_throughput=400 - ) - print('Container with id \'{0}\' created'.format(CONTAINER_ID)) - except exceptions.CosmosResourceExistsError: - container = db.get_container_client(CONTAINER_ID) - print('Container with id \'{0}\' already exists'.format(CONTAINER_ID)) + # Setup database and container (unique IDs so we never touch pre-existing resources) + db = await client.create_database(id=DATABASE_ID) + print('Database with id \'{0}\' created'.format(DATABASE_ID)) + + container = await db.create_container( + id=CONTAINER_ID, + partition_key=PartitionKey(path='/city'), + offer_throughput=400 + ) + print('Container with id \'{0}\' created'.format(CONTAINER_ID)) # Create sample data await create_sample_items(container) @@ -368,16 +367,17 @@ async def run_sample(): # 7. Parallel change feed processing using feed ranges await parallel_change_feed_processing(container) - # Cleanup - try: - await client.delete_database(db) - except exceptions.CosmosResourceNotFoundError: - pass - except exceptions.CosmosHttpResponseError as e: print('\nrun_sample has caught an error. {0}'.format(e.message)) finally: + # Clean up the sample database if it was created by this run + if db is not None: + try: + await client.delete_database(db) + print('\nSample database \'{0}\' deleted'.format(DATABASE_ID)) + except exceptions.CosmosResourceNotFoundError: + pass print("\nrun_sample done") From 3716b19b15398103040abf71179d0d184af19632 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 12 Feb 2026 14:56:40 -0500 Subject: [PATCH 3/4] updated feedrange doc to have async get_latest_session_token() --- sdk/cosmos/azure-cosmos/docs/FeedRanges.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/docs/FeedRanges.md b/sdk/cosmos/azure-cosmos/docs/FeedRanges.md index c0afe25ee004..06f459f06bff 100644 --- a/sdk/cosmos/azure-cosmos/docs/FeedRanges.md +++ b/sdk/cosmos/azure-cosmos/docs/FeedRanges.md @@ -155,9 +155,17 @@ session_token = container.get_latest_session_token( feed_ranges_to_session_tokens=[(feed_range, token), ...], target_feed_range=target_feed_range ) + +# Async (must be awaited) +session_token = await container.get_latest_session_token( + feed_ranges_to_session_tokens=[(feed_range, token), ...], + target_feed_range=target_feed_range +) ``` -See the [session_token_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/session_token_management.py) sample for a complete example. +> **Note:** In the async client, `get_latest_session_token()` is a coroutine and **must be awaited**. + +See the [session_token_management.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/session_token_management.py) (sync) and [session_token_management_async.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/session_token_management_async.py) (async) samples for complete examples. --- From e92df24e28e97e177e29cb8a271d6fa1bcb96b6b Mon Sep 17 00:00:00 2001 From: Andrew Mathew <80082032+andrewmathew1@users.noreply.github.com> Date: Thu, 12 Feb 2026 14:57:16 -0500 Subject: [PATCH 4/4] Update sdk/cosmos/azure-cosmos/samples/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/samples/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/samples/README.md b/sdk/cosmos/azure-cosmos/samples/README.md index fa6cca837c98..6e66a4437eba 100644 --- a/sdk/cosmos/azure-cosmos/samples/README.md +++ b/sdk/cosmos/azure-cosmos/samples/README.md @@ -44,6 +44,7 @@ The following are code samples that show common scenario operations with the Azu * Getting a feed range for a specific partition key * Checking feed range subset relationships * Querying items scoped to a feed range + * Comparing queries scoped to feed ranges with `partition_key`-based queries * Consuming change feed scoped to a feed range * Parallel change feed processing using feed ranges