diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index f3a1afc2..f055fe24 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -104,6 +104,7 @@ async def subscribe( stub, decrement_stream_count = self._get_stream_stub() stream = stub.Subscribe( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) # Ping the stream to provide a nice error message if the cache does not exist. diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index d103018e..97c8b182 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -104,6 +104,7 @@ def subscribe( stub, decrement_stream_count = self._get_stream_stub() stream = stub.Subscribe( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) # Ping the stream to provide a nice error message if the cache does not exist. diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index c318291f..fd8f1d09 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -1,10 +1,14 @@ +from datetime import timedelta from functools import partial from momento import CacheClient, TopicClient +from momento.config.topic_configurations import TopicConfigurations +from momento.errors.error_details import MomentoErrorCode from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like +from tests.conftest import TEST_AUTH_PROVIDER from tests.utils import uuid_bytes, uuid_str from .shared_behaviors import ( @@ -114,3 +118,20 @@ def succeeds_with_nonexistent_topic(client: CacheClient, topic_client: TopicClie resp = topic_client.subscribe(cache_name, topic) assert isinstance(resp, TopicSubscribe.Subscription) + + def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + + # Default config uses 5 second timeout, should succeed + resp = topic_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Subscription) + + # Using a topic client configured with 1ms timeout should cause deadline exceeded error + with TopicClient( + TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER + ) as short_timeout_client: + resp = short_timeout_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Error) + assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index f6f42cda..8b4c957a 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -1,10 +1,14 @@ +from datetime import timedelta from functools import partial from momento import CacheClientAsync, TopicClientAsync +from momento.config.topic_configurations import TopicConfigurations +from momento.errors.error_details import MomentoErrorCode from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like +from tests.conftest import TEST_AUTH_PROVIDER from tests.utils import uuid_bytes, uuid_str from .shared_behaviors_async import ( @@ -129,3 +133,20 @@ async def succeeds_with_nonexistent_topic( resp = await topic_client_async.subscribe(cache_name, topic) assert isinstance(resp, TopicSubscribe.SubscriptionAsync) + + async def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + + # Default config uses 5 second timeout, should succeed + resp = await topic_client_async.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.SubscriptionAsync) + + # Using a topic client configured with 1ms timeout should cause deadline exceeded error + async with TopicClientAsync( + TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER + ) as short_timeout_client: + resp = await short_timeout_client.subscribe(cache_name, topic) + assert isinstance(resp, TopicSubscribe.Error) + assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR