From f6d4e58437335db7ab77e38ac180ce51aed487e8 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 17 Jun 2025 13:22:09 -0700 Subject: [PATCH 1/6] chore: add internal topics grpc connection pool interfaces and classes --- .../sdk/DynamicStreamGrpcConnectionPool.java | 115 ++++++++++++++++++ .../sdk/StaticStreamGrpcConnectionPool.java | 69 +++++++++++ .../sdk/StaticUnaryGrpcConnectionPool.java | 51 ++++++++ .../sdk/StreamTopicGrpcConnectionPool.java | 49 ++++++++ .../sdk/TopicGrpcConnectionPoolUtils.java | 45 +++++++ .../sdk/UnaryTopicGrpcConnectionPool.java | 9 ++ 6 files changed, 338 insertions(+) create mode 100644 momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java create mode 100644 momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java create mode 100644 momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java create mode 100644 momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java create mode 100644 momento-sdk/src/main/java/momento/sdk/TopicGrpcConnectionPoolUtils.java create mode 100644 momento-sdk/src/main/java/momento/sdk/UnaryTopicGrpcConnectionPool.java diff --git a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java new file mode 100644 index 00000000..e8e72f5a --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java @@ -0,0 +1,115 @@ +package momento.sdk; + +import grpc.cache_client.pubsub.PubsubGrpc; +import io.grpc.ManagedChannel; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.TopicConfiguration; +import momento.sdk.exceptions.ClientSdkException; +import momento.sdk.exceptions.MomentoErrorCode; + +public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { + private final CredentialProvider credentialProvider; + private final TopicConfiguration configuration; + private final UUID connectionIdKey; + + private final AtomicInteger index = new AtomicInteger(0); + private final AtomicInteger currentNumStreamGrpcChannels = new AtomicInteger(1); + private final int maxStreamGrpcChannels; + + private final int currentMaxConcurrentStreams; + private final AtomicInteger currentNumActiveStreams = new AtomicInteger(0); + + private final CopyOnWriteArrayList streamChannels; + private final CopyOnWriteArrayList streamStubs; + + public DynamicStreamGrpcConnectionPool( + CredentialProvider credentialProvider, + TopicConfiguration configuration, + UUID connectionIdKey) { + this.currentMaxConcurrentStreams = 100; + this.maxStreamGrpcChannels = + configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); + + this.credentialProvider = credentialProvider; + this.configuration = configuration; + this.connectionIdKey = connectionIdKey; + + this.streamChannels = + IntStream.range(0, this.currentNumStreamGrpcChannels.get()) + .mapToObj( + i -> + TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)) + .collect(Collectors.toCollection(CopyOnWriteArrayList::new)); + this.streamStubs = + streamChannels.stream() + .map(PubsubGrpc::newStub) + .map(StreamStubWithCount::new) + .collect(Collectors.toCollection(CopyOnWriteArrayList::new)); + } + + // Multiple threads could get to the point of seeing currentNumActiveStreams == currentMaxConcurrentStreams, + // but we need to ensure only one thread will add a new channel at a time so that we don't exceed the max number of channels. + private void addNewChannel() { + final int updatedCount = this.currentNumStreamGrpcChannels.incrementAndGet(); + + if (updatedCount > this.maxStreamGrpcChannels) { + this.currentNumStreamGrpcChannels.decrementAndGet(); + return; + } + + this.streamChannels.add( + TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)); + this.streamStubs.add(new StreamStubWithCount(PubsubGrpc.newStub(TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)))); + } + + @Override + public StreamStubWithCount getNextStreamStub() { + // Check if we've reached the current max number of active streams. + if (this.currentNumActiveStreams.get() == this.currentMaxConcurrentStreams) { + // If we have not yet reached the maximum number of channels, add a new channel. + if (this.currentNumStreamGrpcChannels.get() < this.maxStreamGrpcChannels) { + this.addNewChannel(); + } else { + // Otherwise return an error because all channels and streams are occupied. + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); + } + } + + // Try to get a client with capacity for another subscription + // by round-robining through the stubs. + // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. + final int maximumActiveSubscriptions = this.currentNumStreamGrpcChannels.get() * 100; + for (int i = 0; i < maximumActiveSubscriptions; i++) { + final StreamStubWithCount stubWithCount = + streamStubs.get(index.getAndIncrement() % this.currentNumStreamGrpcChannels.get()); + try { + stubWithCount.acquireStubOrThrow(); + this.currentNumActiveStreams.incrementAndGet(); + return stubWithCount; + } catch (ClientSdkException e) { + // If the stub is at capacity, continue to the next one. + continue; + } + } + + // Otherwise return an error if no stubs have capacity. + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); + } + + @Override + public void close() { + streamChannels.forEach(ManagedChannel::shutdown); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java new file mode 100644 index 00000000..e203b326 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java @@ -0,0 +1,69 @@ +package momento.sdk; + +import grpc.cache_client.pubsub.PubsubGrpc; +import io.grpc.ManagedChannel; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.TopicConfiguration; +import momento.sdk.exceptions.ClientSdkException; +import momento.sdk.exceptions.MomentoErrorCode; + +class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { + private final AtomicInteger index = new AtomicInteger(0); + private final int numStreamGrpcChannels; + private final List streamChannels; + private final List streamStubs; + + public StaticStreamGrpcConnectionPool( + CredentialProvider credentialProvider, + TopicConfiguration configuration, + UUID connectionIdKey) { + this.numStreamGrpcChannels = + configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); + this.streamChannels = + IntStream.range(0, this.numStreamGrpcChannels) + .mapToObj( + i -> + TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)) + .collect(Collectors.toList()); + this.streamStubs = + streamChannels.stream() + .map(PubsubGrpc::newStub) + .map(StreamStubWithCount::new) + .collect(Collectors.toList()); + } + + @Override + public StreamStubWithCount getNextStreamStub() { + // Try to get a client with capacity for another subscription + // by round-robining through the stubs. + // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. + final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; + for (int i = 0; i < maximumActiveSubscriptions; i++) { + final StreamStubWithCount stubWithCount = + streamStubs.get(index.getAndIncrement() % this.numStreamGrpcChannels); + try { + stubWithCount.acquireStubOrThrow(); + return stubWithCount; + } catch (ClientSdkException e) { + // If the stub is at capacity, continue to the next one. + continue; + } + } + + // Otherwise return an error if no stubs have capacity. + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); + } + + @Override + public void close() { + streamChannels.forEach(ManagedChannel::shutdown); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java new file mode 100644 index 00000000..1b01fe5f --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java @@ -0,0 +1,51 @@ +package momento.sdk; + +import grpc.cache_client.pubsub.PubsubGrpc; +import grpc.cache_client.pubsub.PubsubGrpc.PubsubStub; +import io.grpc.ManagedChannel; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.TopicConfiguration; + +class StaticUnaryGrpcConnectionPool implements UnaryTopicGrpcConnectionPool { + private final Duration deadline; + private final AtomicInteger index = new AtomicInteger(0); + private final int numUnaryGrpcChannels; + private final List unaryChannels; + private final List unaryStubs; + + public StaticUnaryGrpcConnectionPool( + CredentialProvider credentialProvider, + TopicConfiguration configuration, + UUID connectionIdKey) { + this.deadline = configuration.getTransportStrategy().getGrpcConfiguration().getDeadline(); + this.numUnaryGrpcChannels = + configuration.getTransportStrategy().getGrpcConfiguration().getNumUnaryGrpcChannels(); + this.unaryChannels = + IntStream.range(0, this.numUnaryGrpcChannels) + .mapToObj( + i -> + TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)) + .collect(Collectors.toList()); + this.unaryStubs = unaryChannels.stream().map(PubsubGrpc::newStub).collect(Collectors.toList()); + } + + @Override + public PubsubStub getNextUnaryStub() { + return unaryStubs + .get(this.index.getAndIncrement() % this.numUnaryGrpcChannels) + .withDeadlineAfter(deadline.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + unaryChannels.forEach(ManagedChannel::shutdown); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java new file mode 100644 index 00000000..0d98e343 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java @@ -0,0 +1,49 @@ +package momento.sdk; + +import grpc.cache_client.pubsub.PubsubGrpc; +import java.util.concurrent.atomic.AtomicInteger; +import momento.sdk.exceptions.ClientSdkException; +import momento.sdk.exceptions.MomentoErrorCode; + +interface StreamTopicGrpcConnectionPool { + StreamStubWithCount getNextStreamStub(); + + void close(); +} + +// Helper class for bookkeeping the number of active concurrent subscriptions. +final class StreamStubWithCount { + private final PubsubGrpc.PubsubStub stub; + private final AtomicInteger count = new AtomicInteger(0); + + StreamStubWithCount(PubsubGrpc.PubsubStub stub) { + this.stub = stub; + } + + PubsubGrpc.PubsubStub getStub() { + return stub; + } + + int getCount() { + return count.get(); + } + + int incrementCount() { + return count.incrementAndGet(); + } + + int decrementCount() { + return count.decrementAndGet(); + } + + void acquireStubOrThrow() throws ClientSdkException { + if (count.incrementAndGet() <= 100) { + return; + } else { + count.decrementAndGet(); + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); + } + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/TopicGrpcConnectionPoolUtils.java b/momento-sdk/src/main/java/momento/sdk/TopicGrpcConnectionPoolUtils.java new file mode 100644 index 00000000..acab5a2e --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/TopicGrpcConnectionPoolUtils.java @@ -0,0 +1,45 @@ +package momento.sdk; + +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.TopicConfiguration; +import momento.sdk.config.middleware.Middleware; +import momento.sdk.config.middleware.MiddlewareRequestHandlerContext; +import momento.sdk.internal.GrpcChannelOptions; + +// Utility class for setting up a connection to the Momento Topic service. +final class TopicGrpcConnectionPoolUtils { + + // Set up a connection to the Momento Topic service. + protected static ManagedChannel setupConnection( + CredentialProvider credentialProvider, + TopicConfiguration configuration, + UUID connectionIdKey) { + final NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress( + credentialProvider.getCacheEndpoint(), credentialProvider.getPort()); + + // set additional channel options (message size, keepalive, auth, etc) + GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder( + configuration.getTransportStrategy().getGrpcConfiguration(), + channelBuilder, + credentialProvider.isEndpointSecure()); + + final List clientInterceptors = new ArrayList<>(); + + final List middlewares = configuration.getMiddlewares(); + final MiddlewareRequestHandlerContext context = + () -> Collections.singletonMap(connectionIdKey.toString(), UUID.randomUUID().toString()); + clientInterceptors.add(new GrpcMiddlewareInterceptor(middlewares, context)); + + clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken(), "topic")); + channelBuilder.intercept(clientInterceptors); + return channelBuilder.build(); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/UnaryTopicGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/UnaryTopicGrpcConnectionPool.java new file mode 100644 index 00000000..31a73ee0 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/UnaryTopicGrpcConnectionPool.java @@ -0,0 +1,9 @@ +package momento.sdk; + +import grpc.cache_client.pubsub.PubsubGrpc; + +interface UnaryTopicGrpcConnectionPool { + PubsubGrpc.PubsubStub getNextUnaryStub(); + + void close(); +} From d20d864b491c775340da571cb9a235a6405b1c99 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 17 Jun 2025 13:22:55 -0700 Subject: [PATCH 2/6] feat: add WithMaxSubscriptions config and update topic client to use grpc pool classes --- .../momento/sdk/ScsTopicGrpcStubsManager.java | 158 +++--------------- .../sdk/config/TopicConfiguration.java | 60 +++++++ 2 files changed, 79 insertions(+), 139 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 484df6ef..87bef92d 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -1,64 +1,11 @@ package momento.sdk; import grpc.cache_client.pubsub.PubsubGrpc; -import io.grpc.ClientInterceptor; -import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import java.io.Closeable; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; import momento.sdk.config.TopicConfiguration; -import momento.sdk.config.middleware.Middleware; -import momento.sdk.config.middleware.MiddlewareRequestHandlerContext; -import momento.sdk.exceptions.ClientSdkException; -import momento.sdk.exceptions.MomentoErrorCode; -import momento.sdk.internal.GrpcChannelOptions; - -// Helper class for bookkeeping the number of active concurrent subscriptions. -final class StreamStubWithCount { - private final PubsubGrpc.PubsubStub stub; - private final AtomicInteger count = new AtomicInteger(0); - - StreamStubWithCount(PubsubGrpc.PubsubStub stub) { - this.stub = stub; - } - - PubsubGrpc.PubsubStub getStub() { - return stub; - } - - int getCount() { - return count.get(); - } - - int incrementCount() { - return count.incrementAndGet(); - } - - int decrementCount() { - return count.decrementAndGet(); - } - - void acquireStubOrThrow() throws ClientSdkException { - if (count.incrementAndGet() <= 100) { - return; - } else { - count.decrementAndGet(); - throw new ClientSdkException( - MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, - "Maximum number of active subscriptions reached"); - } - } -} /** * Manager responsible for GRPC channels and stubs for the Topics. @@ -68,110 +15,43 @@ void acquireStubOrThrow() throws ClientSdkException { * impacting the API business logic. */ final class ScsTopicGrpcStubsManager implements Closeable { - - private final List unaryChannels; - private final List unaryStubs; - private final AtomicInteger unaryIndex = new AtomicInteger(0); - - private final List streamChannels; - private final List streamStubs; - private final AtomicInteger streamIndex = new AtomicInteger(0); - public static final UUID CONNECTION_ID_KEY = UUID.randomUUID(); - private final int numUnaryGrpcChannels; - private final int numStreamGrpcChannels; + private final UnaryTopicGrpcConnectionPool unaryConnectionPool; + private final StreamTopicGrpcConnectionPool streamConnectionPool; + private final TopicConfiguration configuration; - private final Duration deadline; ScsTopicGrpcStubsManager( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { this.configuration = configuration; - this.deadline = configuration.getTransportStrategy().getGrpcConfiguration().getDeadline(); - this.numUnaryGrpcChannels = - configuration.getTransportStrategy().getGrpcConfiguration().getNumUnaryGrpcChannels(); - this.numStreamGrpcChannels = - configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); - - this.unaryChannels = - IntStream.range(0, this.numUnaryGrpcChannels) - .mapToObj(i -> setupConnection(credentialProvider, configuration)) - .collect(Collectors.toList()); - this.unaryStubs = unaryChannels.stream().map(PubsubGrpc::newStub).collect(Collectors.toList()); - - this.streamChannels = - IntStream.range(0, this.numStreamGrpcChannels) - .mapToObj(i -> setupConnection(credentialProvider, configuration)) - .collect(Collectors.toList()); - this.streamStubs = - streamChannels.stream() - .map(PubsubGrpc::newStub) - .map(StreamStubWithCount::new) - .collect(Collectors.toList()); - } - - private static ManagedChannel setupConnection( - CredentialProvider credentialProvider, TopicConfiguration configuration) { - final NettyChannelBuilder channelBuilder = - NettyChannelBuilder.forAddress( - credentialProvider.getCacheEndpoint(), credentialProvider.getPort()); + this.unaryConnectionPool = + new StaticUnaryGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY); - // set additional channel options (message size, keepalive, auth, etc) - GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder( - configuration.getTransportStrategy().getGrpcConfiguration(), - channelBuilder, - credentialProvider.isEndpointSecure()); - - final List clientInterceptors = new ArrayList<>(); - - final List middlewares = configuration.getMiddlewares(); - final MiddlewareRequestHandlerContext context = - () -> Collections.singletonMap(CONNECTION_ID_KEY.toString(), UUID.randomUUID().toString()); - clientInterceptors.add(new GrpcMiddlewareInterceptor(middlewares, context)); - - clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken(), "topic")); - channelBuilder.intercept(clientInterceptors); - return channelBuilder.build(); + if (configuration.getIsNumStreamChannelsDynamic()) { + this.streamConnectionPool = + new DynamicStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY); + } else { + this.streamConnectionPool = + new StaticStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY); + } } - /** Round-robin publish stub. */ - PubsubGrpc.PubsubStub getNextUnaryStub() { - return unaryStubs - .get(unaryIndex.getAndIncrement() % this.numUnaryGrpcChannels) - .withDeadlineAfter(deadline.toMillis(), TimeUnit.MILLISECONDS); + TopicConfiguration getConfiguration() { + return configuration; } - /** Round-robin subscribe stub. */ StreamStubWithCount getNextStreamStub() { - // Try to get a client with capacity for another subscription - // by round-robining through the stubs. - // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. - final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; - for (int i = 0; i < maximumActiveSubscriptions; i++) { - final StreamStubWithCount stubWithCount = - streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); - try { - stubWithCount.acquireStubOrThrow(); - return stubWithCount; - } catch (ClientSdkException e) { - // If the stub is at capacity, continue to the next one. - continue; - } - } - - // Otherwise return an error if no stubs have capacity. - throw new ClientSdkException( - MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, - "Maximum number of active subscriptions reached"); + return streamConnectionPool.getNextStreamStub(); } - TopicConfiguration getConfiguration() { - return configuration; + PubsubGrpc.PubsubStub getNextUnaryStub() { + return unaryConnectionPool.getNextUnaryStub(); } @Override public void close() { - unaryChannels.forEach(ManagedChannel::shutdown); - streamChannels.forEach(ManagedChannel::shutdown); + unaryConnectionPool.close(); + streamConnectionPool.close(); } } diff --git a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java index 57d9e8ee..e774a328 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java @@ -7,6 +7,7 @@ import momento.sdk.config.middleware.Middleware; import momento.sdk.config.transport.GrpcConfiguration; import momento.sdk.config.transport.TransportStrategy; +import momento.sdk.exceptions.InvalidArgumentException; import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; @@ -19,6 +20,7 @@ public class TopicConfiguration { private final SubscriptionRetryStrategy subscriptionRetryStrategy; private final List middlewares; private final Logger logger; + private final boolean isNumStreamChannelsDynamic; /** * Creates a new topic configuration. @@ -38,6 +40,31 @@ public TopicConfiguration( this.subscriptionRetryStrategy = subscriptionRetryStrategy; this.middlewares = middlewares; this.logger = logger; + this.isNumStreamChannelsDynamic = false; + } + + /** + * Creates a new topic configuration. + * + * @param transportStrategy Responsible for configuring network tunables. + * @param subscriptionRetryStrategy Responsible for determining when to reconnect a broken + * subscription. + * @param middlewares List of middleware that can intercept and modify calls to Momento. + * @param logger Responsible for logging + * @param isNumStreamChannelsDynamic Whether the number of stream gRPC channels dynamically grow + * to accommodate the maximum number of active subscriptions. + */ + public TopicConfiguration( + TransportStrategy transportStrategy, + SubscriptionRetryStrategy subscriptionRetryStrategy, + List middlewares, + Logger logger, + boolean isNumStreamChannelsDynamic) { + this.transportStrategy = transportStrategy; + this.subscriptionRetryStrategy = subscriptionRetryStrategy; + this.middlewares = middlewares; + this.logger = logger; + this.isNumStreamChannelsDynamic = isNumStreamChannelsDynamic; } /** @@ -147,4 +174,37 @@ public TopicConfiguration withMiddleware(@Nonnull final Middleware middleware) { public List getMiddlewares() { return middlewares; } + + /** + * Copy constructor that sets the maximum number of active subscriptions the topic client should + * accommodate, but will start with one channel which supports 100 subscriptions. + * + * @param maxSubscriptions The new maximum number of active subscriptions. + * @return a new TopicConfiguration with the updated maximum number of active subscriptions. + */ + public TopicConfiguration withMaxSubscriptions(int maxSubscriptions) { + if (maxSubscriptions < 0) { + throw new InvalidArgumentException("maxSubscriptions must be greater than 0"); + } + final int maxStreamChannels = (int) Math.ceil(maxSubscriptions / 100.0); + final GrpcConfiguration newGrpcConfiguration = + this.getTransportStrategy() + .getGrpcConfiguration() + .withNumStreamGrpcChannels(maxStreamChannels); + final TransportStrategy newTransportStrategy = + this.getTransportStrategy().withGrpcConfiguration(newGrpcConfiguration); + return new TopicConfiguration( + newTransportStrategy, this.subscriptionRetryStrategy, this.middlewares, this.logger, true); + } + + /** + * Whether the number of stream gRPC channels dynamically grow to accommodate the maximum number + * of active subscriptions. + * + * @return Whether the number of stream gRPC channels dynamically grow to accommodate the maximum + * number of active subscriptions. + */ + public boolean getIsNumStreamChannelsDynamic() { + return isNumStreamChannelsDynamic; + } } From 8752cb025db2ed508f4faa2051e9d0ed2c9c3e97 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 17 Jun 2025 13:23:14 -0700 Subject: [PATCH 3/6] chore: add tests for dynamic stream grpc channel pools --- .../TopicsSubscriptionInitializationTest.java | 434 +++++++++++++++++- 1 file changed, 422 insertions(+), 12 deletions(-) diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java index 2527224c..e58d4e1f 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java @@ -1,5 +1,6 @@ package momento.sdk.retry; +import static momento.sdk.retry.BaseMomentoLocalTestClass.withCacheAndTopicClient; import static momento.sdk.retry.BaseMomentoLocalTestClass.withCacheAndTopicClientWithNumStreamChannels; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -51,7 +52,7 @@ static void setup() { @Test @Timeout(30) - public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() + public void staticPool_oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() throws Exception { unsubscribeCounter = 0; @@ -89,7 +90,7 @@ public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() subscriptions.get(0).unsubscribe(); // Wait for the subscription to end try { - Thread.sleep(500); + Thread.sleep(750); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Test interrupted while waiting for subscriptions", e); @@ -113,7 +114,7 @@ public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() @ParameterizedTest @ValueSource(ints = {2, 10, 20}) @Timeout(30) - public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests( + public void staticPool_multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests( int numGrpcChannels) throws Exception { unsubscribeCounter = 0; final int maxStreamCapacity = 100 * numGrpcChannels; @@ -154,7 +155,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests } // Wait a bit for the subscription to end try { - Thread.sleep(500); + Thread.sleep(750); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Test interrupted while waiting for subscriptions", e); @@ -202,7 +203,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests @ParameterizedTest @ValueSource(ints = {2, 10, 20}) @Timeout(30) - public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( + public void staticPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( int numGrpcChannels) throws Exception { final int maxStreamCapacity = 100 * numGrpcChannels; @@ -221,7 +222,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( // Wait a bit for all subscriptions to be fully established try { - Thread.sleep(500); + Thread.sleep(750); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Test interrupted while waiting for subscriptions", e); @@ -245,7 +246,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( @ParameterizedTest @ValueSource(ints = {2, 10, 20}) @Timeout(30) - public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity( + public void staticPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity( int numGrpcChannels) throws Exception { final int maxStreamCapacity = 100 * numGrpcChannels; @@ -295,7 +296,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapac @ParameterizedTest @ValueSource(ints = {2, 10, 20}) @Timeout(30) - public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity( + public void staticPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity( int numGrpcChannels) throws Exception { final int maxStreamCapacity = 100 * numGrpcChannels; @@ -337,7 +338,8 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCap @Test @Timeout(30) - public void shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() throws Exception { + public void staticPool_shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() + throws Exception { final int numGrpcChannels = 1; final int maxStreamCapacity = 100 * numGrpcChannels; @@ -363,8 +365,8 @@ public void onError(Throwable t) { } }; - // Should successfully start the maximum number of subscriptions because 10 attempts ran - // into NOT_FOUND_ERROR. The errors should have decremented the active subscriptions + // Should successfully start the maximum number of subscriptions because 10 attempts + // ran into NOT_FOUND_ERROR. The errors should have decremented the active subscriptions // count. List successfulSubscriptions = new ArrayList<>(); for (int i = 0; i < maxStreamCapacity + 10; i++) { @@ -399,7 +401,8 @@ public void onError(Throwable t) { @Test @Timeout(30) - public void oneStreamChannel_properlyDecrementsWhenErrorOccursMidStream() throws Exception { + public void staticPool_oneStreamChannel_properlyDecrementsWhenErrorOccursMidStream() + throws Exception { unsubscribeCounter = 0; final AtomicInteger unsubscribeOnErrorCounter = new AtomicInteger(0); final ISubscriptionCallbacks callbacks = @@ -458,4 +461,411 @@ public void onError(Throwable t) { assertEquals(1, unsubscribeOnErrorCounter.get()); }); } + + @Test + @Timeout(30) + public void dynamicPool_oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() + throws Exception { + unsubscribeCounter = 0; + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(100), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + // These should all succeed + // Starting 100 subscriptions on 1 channel should be fine + List subscriptions = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Starting one more subscription should produce resource exhausted error + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Error.class); + assertEquals( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + ((TopicSubscribeResponse.Error) response).getErrorCode()); + + // Ending a subscription should free up one new stream + subscriptions.get(0).unsubscribe(); + // Wait for the subscription to end + try { + Thread.sleep(750); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(1, unsubscribeCounter); + + final TopicSubscribeResponse response2 = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response2).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response2); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + if (sub != null) { + sub.unsubscribe(); + } + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {200, 1000, 2000}) + @Timeout(60) + public void dynamicPool_multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests( + int maxSubscriptions) throws Exception { + unsubscribeCounter = 0; + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(maxSubscriptions), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxSubscriptions; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Unsubscribe half of the subscriptions + final int unsubscribeBurstSize = maxSubscriptions / 2; + for (int i = 0; i < unsubscribeBurstSize; i++) { + subscriptions.get(i).unsubscribe(); + } + // Wait a bit for the subscription to end + try { + Thread.sleep(750); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(unsubscribeBurstSize, unsubscribeCounter); + + // Burst of subscribe requests should succeed + final int subscribeBurstSize = maxSubscriptions / 2 + 10; + List> subscribeRequests2 = new ArrayList<>(); + for (int i = 0; i < subscribeBurstSize; i++) { + final CompletableFuture subscribePromise = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests2.add(subscribePromise); + } + CompletableFuture.allOf(subscribeRequests2.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + List successfulSubscriptions2 = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests2) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions2.add((TopicSubscribeResponse.Subscription) response); + } else { + numFailedSubscriptions++; + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(subscribeBurstSize - 10, successfulSubscriptions2.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {200, 1000, 2000}) + @Timeout(60) + public void dynamicPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( + int maxSubscriptions) throws Exception { + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(maxSubscriptions), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxSubscriptions; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {200, 1000, 2000}) + @Timeout(60) + public void dynamicPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity( + int maxSubscriptions) throws Exception { + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(maxSubscriptions), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxSubscriptions + 10; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Error) { + numFailedSubscriptions++; + } else { + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(maxSubscriptions, subscriptions.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {200, 1000, 2000}) + @Timeout(60) + public void dynamicPool_multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity( + int maxSubscriptions) throws Exception { + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(maxSubscriptions), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxSubscriptions / 2; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @Test + @Timeout(30) + public void dynamicPool_shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() + throws Exception { + final int maxSubscriptions = 100; + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(maxSubscriptions), + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + final Semaphore errorSemaphore = new Semaphore(0); + final AtomicInteger errorCounter = new AtomicInteger(0); + + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + errorCounter.incrementAndGet(); + errorSemaphore.release(); + } + }; + + // Should successfully start the maximum number of subscriptions because 10 attempts ran + // into NOT_FOUND_ERROR. The errors should have decremented the active subscriptions + // count. + List successfulSubscriptions = new ArrayList<>(); + for (int i = 0; i < maxSubscriptions + 10; i++) { + String cacheNameToUse = cacheName; + if (i % 11 == 0) { + cacheNameToUse = "this-cache-does-not-exist"; + } + TopicSubscribeResponse attempt = + topicClient.subscribe(cacheNameToUse, "test-topic", callbacks).join(); + if (attempt instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions.add((TopicSubscribeResponse.Subscription) attempt); + } else { + assertThat(attempt).isInstanceOf(TopicSubscribeResponse.Error.class); + assertThat(((TopicSubscribeResponse.Error) attempt).getErrorCode()) + .isEqualTo(MomentoErrorCode.NOT_FOUND_ERROR); + errorCounter.incrementAndGet(); + } + } + + // Assert that we have received maxStreamCapacity number of successful subscriptions + assertThat(successfulSubscriptions.size()).isEqualTo(maxSubscriptions); + + // Assert that we have received 10 NOT_FOUND_ERRORs + assertThat(errorCounter).hasValue(10); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : successfulSubscriptions) { + sub.unsubscribe(); + } + }); + } + + @Test + @Timeout(30) + public void dynamicPool_oneStreamChannel_properlyDecrementsWhenErrorOccursMidStream() + throws Exception { + unsubscribeCounter = 0; + final AtomicInteger unsubscribeOnErrorCounter = new AtomicInteger(0); + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() { + System.out.println("onCompleted"); + unsubscribeCounter++; + } + + @Override + public void onError(Throwable t) { + System.out.println("onError"); + unsubscribeOnErrorCounter.incrementAndGet(); + } + }; + + final MomentoLocalMiddlewareArgs middlewareArgs = + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()) + .streamError(MomentoErrorCode.NOT_FOUND_ERROR) + .streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE)) + .streamErrorMessageLimit(3) + .build(); + + withCacheAndTopicClient( + (config) -> config.withMaxSubscriptions(100), + middlewareArgs, + (topicClient, cacheName) -> { + List subscriptions = new ArrayList<>(); + + // Subscribe but expecting an error after a couple of heartbeats + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "topic", callbacks).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + + // Wait for the subscription that ran into the error to be closed + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + if (sub != null) { + sub.unsubscribe(); + } + } + + assertEquals(0, unsubscribeCounter); + assertEquals(1, unsubscribeOnErrorCounter.get()); + }); + } } From 0fe4ad234232f289cf28eefa3d9a4710066a38fc Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 17 Jun 2025 13:29:07 -0700 Subject: [PATCH 4/6] fix formatting --- .../sdk/DynamicStreamGrpcConnectionPool.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java index e8e72f5a..19199d4d 100644 --- a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java @@ -53,8 +53,10 @@ public DynamicStreamGrpcConnectionPool( .collect(Collectors.toCollection(CopyOnWriteArrayList::new)); } - // Multiple threads could get to the point of seeing currentNumActiveStreams == currentMaxConcurrentStreams, - // but we need to ensure only one thread will add a new channel at a time so that we don't exceed the max number of channels. + // Multiple threads could get to the point of seeing currentNumActiveStreams == + // currentMaxConcurrentStreams, + // but we need to ensure only one thread will add a new channel at a time so that we don't exceed + // the max number of channels. private void addNewChannel() { final int updatedCount = this.currentNumStreamGrpcChannels.incrementAndGet(); @@ -66,8 +68,11 @@ private void addNewChannel() { this.streamChannels.add( TopicGrpcConnectionPoolUtils.setupConnection( credentialProvider, configuration, connectionIdKey)); - this.streamStubs.add(new StreamStubWithCount(PubsubGrpc.newStub(TopicGrpcConnectionPoolUtils.setupConnection( - credentialProvider, configuration, connectionIdKey)))); + this.streamStubs.add( + new StreamStubWithCount( + PubsubGrpc.newStub( + TopicGrpcConnectionPoolUtils.setupConnection( + credentialProvider, configuration, connectionIdKey)))); } @Override From 6f847ad6e6b77ed0b6102e23d0dfd82623a1e99e Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 18 Jun 2025 09:58:11 -0700 Subject: [PATCH 5/6] extract num concurrent streams per channel into const --- .../java/momento/sdk/DynamicStreamGrpcConnectionPool.java | 7 +++++-- .../java/momento/sdk/StaticStreamGrpcConnectionPool.java | 4 +++- .../java/momento/sdk/StreamTopicGrpcConnectionPool.java | 3 ++- .../main/java/momento/sdk/config/TopicConfiguration.java | 6 +++++- .../main/java/momento/sdk/internal/GrpcChannelOptions.java | 1 + 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java index 19199d4d..c2613979 100644 --- a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java @@ -11,6 +11,7 @@ import momento.sdk.config.TopicConfiguration; import momento.sdk.exceptions.ClientSdkException; import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.internal.GrpcChannelOptions; public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { private final CredentialProvider credentialProvider; @@ -31,7 +32,7 @@ public DynamicStreamGrpcConnectionPool( CredentialProvider credentialProvider, TopicConfiguration configuration, UUID connectionIdKey) { - this.currentMaxConcurrentStreams = 100; + this.currentMaxConcurrentStreams = GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL; this.maxStreamGrpcChannels = configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); @@ -93,7 +94,9 @@ public StreamStubWithCount getNextStreamStub() { // Try to get a client with capacity for another subscription // by round-robining through the stubs. // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. - final int maximumActiveSubscriptions = this.currentNumStreamGrpcChannels.get() * 100; + final int maximumActiveSubscriptions = + this.currentNumStreamGrpcChannels.get() + * GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL; for (int i = 0; i < maximumActiveSubscriptions; i++) { final StreamStubWithCount stubWithCount = streamStubs.get(index.getAndIncrement() % this.currentNumStreamGrpcChannels.get()); diff --git a/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java index e203b326..04198984 100644 --- a/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java @@ -11,6 +11,7 @@ import momento.sdk.config.TopicConfiguration; import momento.sdk.exceptions.ClientSdkException; import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.internal.GrpcChannelOptions; class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { private final AtomicInteger index = new AtomicInteger(0); @@ -43,7 +44,8 @@ public StreamStubWithCount getNextStreamStub() { // Try to get a client with capacity for another subscription // by round-robining through the stubs. // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. - final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; + final int maximumActiveSubscriptions = + this.numStreamGrpcChannels * GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL; for (int i = 0; i < maximumActiveSubscriptions; i++) { final StreamStubWithCount stubWithCount = streamStubs.get(index.getAndIncrement() % this.numStreamGrpcChannels); diff --git a/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java index 0d98e343..b86dcc73 100644 --- a/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/StreamTopicGrpcConnectionPool.java @@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger; import momento.sdk.exceptions.ClientSdkException; import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.internal.GrpcChannelOptions; interface StreamTopicGrpcConnectionPool { StreamStubWithCount getNextStreamStub(); @@ -37,7 +38,7 @@ int decrementCount() { } void acquireStubOrThrow() throws ClientSdkException { - if (count.incrementAndGet() <= 100) { + if (count.incrementAndGet() <= GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL) { return; } else { count.decrementAndGet(); diff --git a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java index e774a328..3a5a05a8 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java @@ -8,6 +8,7 @@ import momento.sdk.config.transport.GrpcConfiguration; import momento.sdk.config.transport.TransportStrategy; import momento.sdk.exceptions.InvalidArgumentException; +import momento.sdk.internal.GrpcChannelOptions; import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; @@ -186,7 +187,10 @@ public TopicConfiguration withMaxSubscriptions(int maxSubscriptions) { if (maxSubscriptions < 0) { throw new InvalidArgumentException("maxSubscriptions must be greater than 0"); } - final int maxStreamChannels = (int) Math.ceil(maxSubscriptions / 100.0); + final int maxStreamChannels = + (int) + Math.ceil( + maxSubscriptions / GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL); final GrpcConfiguration newGrpcConfiguration = this.getTransportStrategy() .getGrpcConfiguration() diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java index 9c991d3a..4c680511 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -24,6 +24,7 @@ public class GrpcChannelOptions { public static final int DEFAULT_NUM_GRPC_CHANNELS = 4; public static final int DEFAULT_NUM_UNARY_GRPC_CHANNELS = 4; public static final int DEFAULT_NUM_STREAM_GRPC_CHANNELS = 4; + public static final int NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL = 100; // grpc default public static void applyGrpcConfigurationToChannelBuilder( IGrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) { From 4a8cafb4270d2b2a119cbf720502cead1ea801aa Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 18 Jun 2025 10:05:45 -0700 Subject: [PATCH 6/6] implement Closeable --- .../main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java | 3 ++- .../main/java/momento/sdk/StaticStreamGrpcConnectionPool.java | 3 ++- .../main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java index c2613979..9c5addca 100644 --- a/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/DynamicStreamGrpcConnectionPool.java @@ -2,6 +2,7 @@ import grpc.cache_client.pubsub.PubsubGrpc; import io.grpc.ManagedChannel; +import java.io.Closeable; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -13,7 +14,7 @@ import momento.sdk.exceptions.MomentoErrorCode; import momento.sdk.internal.GrpcChannelOptions; -public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { +public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable { private final CredentialProvider credentialProvider; private final TopicConfiguration configuration; private final UUID connectionIdKey; diff --git a/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java index 04198984..88d9ebee 100644 --- a/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/StaticStreamGrpcConnectionPool.java @@ -2,6 +2,7 @@ import grpc.cache_client.pubsub.PubsubGrpc; import io.grpc.ManagedChannel; +import java.io.Closeable; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -13,7 +14,7 @@ import momento.sdk.exceptions.MomentoErrorCode; import momento.sdk.internal.GrpcChannelOptions; -class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool { +class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable { private final AtomicInteger index = new AtomicInteger(0); private final int numStreamGrpcChannels; private final List streamChannels; diff --git a/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java b/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java index 1b01fe5f..7da17f5b 100644 --- a/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java +++ b/momento-sdk/src/main/java/momento/sdk/StaticUnaryGrpcConnectionPool.java @@ -3,6 +3,7 @@ import grpc.cache_client.pubsub.PubsubGrpc; import grpc.cache_client.pubsub.PubsubGrpc.PubsubStub; import io.grpc.ManagedChannel; +import java.io.Closeable; import java.time.Duration; import java.util.List; import java.util.UUID; @@ -13,7 +14,7 @@ import momento.sdk.auth.CredentialProvider; import momento.sdk.config.TopicConfiguration; -class StaticUnaryGrpcConnectionPool implements UnaryTopicGrpcConnectionPool { +class StaticUnaryGrpcConnectionPool implements UnaryTopicGrpcConnectionPool, Closeable { private final Duration deadline; private final AtomicInteger index = new AtomicInteger(0); private final int numUnaryGrpcChannels;