From 68af76a98b9bd2983b6ca54306a2e4f5c675bccb Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 27 Jun 2025 14:32:54 -0700 Subject: [PATCH 1/2] chore: add timeout to topic subscribe --- .../internal/aio/_scs_pubsub_client.py | 3 ++- .../synchronous/_scs_pubsub_client.py | 3 ++- tests/momento/topic_client/test_topics.py | 21 +++++++++++++++++++ .../momento/topic_client/test_topics_async.py | 21 +++++++++++++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index f3a1afc2..3242687a 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -36,7 +36,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._endpoint = endpoint default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() - self._default_deadline_seconds = default_deadline.total_seconds() + self._default_deadline_seconds = int(default_deadline.total_seconds()) # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -104,6 +104,7 @@ async def subscribe( stub, decrement_stream_count = self._get_stream_stub() stream = stub.Subscribe( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) # Ping the stream to provide a nice error message if the cache does not exist. diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index d103018e..38e510b4 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -36,7 +36,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._endpoint = endpoint default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() - self._default_deadline_seconds = default_deadline.total_seconds() + self._default_deadline_seconds = int(default_deadline.total_seconds()) # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -104,6 +104,7 @@ def subscribe( stub, decrement_stream_count = self._get_stream_stub() stream = stub.Subscribe( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) # Ping the stream to provide a nice error message if the cache does not exist. diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index c318291f..fd8f1d09 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -1,10 +1,14 @@ +from datetime import timedelta from functools import partial from momento import CacheClient, TopicClient +from momento.config.topic_configurations import TopicConfigurations +from momento.errors.error_details import MomentoErrorCode from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like +from tests.conftest import TEST_AUTH_PROVIDER from tests.utils import uuid_bytes, uuid_str from .shared_behaviors import ( @@ -114,3 +118,20 @@ def succeeds_with_nonexistent_topic(client: CacheClient, topic_client: TopicClie resp = topic_client.subscribe(cache_name, topic) assert isinstance(resp, TopicSubscribe.Subscription) + + def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + + # Default config uses 5 second timeout, should succeed + resp = topic_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Subscription) + + # Using a topic client configured with 1ms timeout should cause deadline exceeded error + with TopicClient( + TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER + ) as short_timeout_client: + resp = short_timeout_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Error) + assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index f6f42cda..8b4c957a 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -1,10 +1,14 @@ +from datetime import timedelta from functools import partial from momento import CacheClientAsync, TopicClientAsync +from momento.config.topic_configurations import TopicConfigurations +from momento.errors.error_details import MomentoErrorCode from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like +from tests.conftest import TEST_AUTH_PROVIDER from tests.utils import uuid_bytes, uuid_str from .shared_behaviors_async import ( @@ -129,3 +133,20 @@ async def succeeds_with_nonexistent_topic( resp = await topic_client_async.subscribe(cache_name, topic) assert isinstance(resp, TopicSubscribe.SubscriptionAsync) + + async def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + + # Default config uses 5 second timeout, should succeed + resp = await topic_client_async.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.SubscriptionAsync) + + # Using a topic client configured with 1ms timeout should cause deadline exceeded error + async with TopicClientAsync( + TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER + ) as short_timeout_client: + resp = await short_timeout_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Error) + assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR From a4b0687d4af627fc8cdfb94466a42a510dff2b64 Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 27 Jun 2025 15:26:46 -0700 Subject: [PATCH 2/2] remove int conversion --- src/momento/internal/aio/_scs_pubsub_client.py | 2 +- src/momento/internal/synchronous/_scs_pubsub_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index 3242687a..f055fe24 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -36,7 +36,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._endpoint = endpoint default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() - self._default_deadline_seconds = int(default_deadline.total_seconds()) + self._default_deadline_seconds = default_deadline.total_seconds() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 38e510b4..97c8b182 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -36,7 +36,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._endpoint = endpoint default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() - self._default_deadline_seconds = int(default_deadline.total_seconds()) + self._default_deadline_seconds = default_deadline.total_seconds() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle