Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/momento/internal/aio/_scs_pubsub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async def subscribe(
stub, decrement_stream_count = self._get_stream_stub()
stream = stub.Subscribe( # type: ignore[misc]
request,
timeout=self._default_deadline_seconds,
)

# Ping the stream to provide a nice error message if the cache does not exist.
Expand Down
1 change: 1 addition & 0 deletions src/momento/internal/synchronous/_scs_pubsub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def subscribe(
stub, decrement_stream_count = self._get_stream_stub()
stream = stub.Subscribe( # type: ignore[misc]
request,
timeout=self._default_deadline_seconds,
)

# Ping the stream to provide a nice error message if the cache does not exist.
Expand Down
21 changes: 21 additions & 0 deletions tests/momento/topic_client/test_topics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import timedelta
from functools import partial

from momento import CacheClient, TopicClient
from momento.config.topic_configurations import TopicConfigurations
from momento.errors.error_details import MomentoErrorCode
from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem
from pytest import fixture
from pytest_describe import behaves_like

from tests.conftest import TEST_AUTH_PROVIDER
from tests.utils import uuid_bytes, uuid_str

from .shared_behaviors import (
Expand Down Expand Up @@ -114,3 +118,20 @@ def succeeds_with_nonexistent_topic(client: CacheClient, topic_client: TopicClie

resp = topic_client.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.Subscription)

def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response(
client: CacheClient, topic_client: TopicClient, cache_name: str
) -> None:
topic = uuid_str()

# Default config uses 5 second timeout, should succeed
resp = topic_client.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.Subscription)

# Using a topic client configured with 1ms timeout should cause deadline exceeded error
with TopicClient(
TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER
) as short_timeout_client:
resp = short_timeout_client.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.Error)
assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR
21 changes: 21 additions & 0 deletions tests/momento/topic_client/test_topics_async.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import timedelta
from functools import partial

from momento import CacheClientAsync, TopicClientAsync
from momento.config.topic_configurations import TopicConfigurations
from momento.errors.error_details import MomentoErrorCode
from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem
from pytest import fixture
from pytest_describe import behaves_like

from tests.conftest import TEST_AUTH_PROVIDER
from tests.utils import uuid_bytes, uuid_str

from .shared_behaviors_async import (
Expand Down Expand Up @@ -129,3 +133,20 @@ async def succeeds_with_nonexistent_topic(

resp = await topic_client_async.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.SubscriptionAsync)

async def deadline_exceeded_when_timeout_is_shorter_than_subscribe_response(
client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str
) -> None:
topic = uuid_str()

# Default config uses 5 second timeout, should succeed
resp = await topic_client_async.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.SubscriptionAsync)

# Using a topic client configured with 1ms timeout should cause deadline exceeded error
async with TopicClientAsync(
TopicConfigurations.Default.latest().with_client_timeout(timedelta(milliseconds=1)), TEST_AUTH_PROVIDER
) as short_timeout_client:
resp = await short_timeout_client.subscribe(cache_name, topic)
assert isinstance(resp, TopicSubscribe.Error)
assert resp.error_code == MomentoErrorCode.TIMEOUT_ERROR
Loading