Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package momento.sdk;

import grpc.cache_client.pubsub.PubsubGrpc;
import io.grpc.ManagedChannel;
import java.io.Closeable;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.TopicConfiguration;
import momento.sdk.exceptions.ClientSdkException;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.internal.GrpcChannelOptions;

public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable {
private final CredentialProvider credentialProvider;
private final TopicConfiguration configuration;
private final UUID connectionIdKey;

private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger currentNumStreamGrpcChannels = new AtomicInteger(1);
private final int maxStreamGrpcChannels;

private final int currentMaxConcurrentStreams;
private final AtomicInteger currentNumActiveStreams = new AtomicInteger(0);

private final CopyOnWriteArrayList<ManagedChannel> streamChannels;
private final CopyOnWriteArrayList<StreamStubWithCount> streamStubs;

public DynamicStreamGrpcConnectionPool(
CredentialProvider credentialProvider,
TopicConfiguration configuration,
UUID connectionIdKey) {
this.currentMaxConcurrentStreams = GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
this.maxStreamGrpcChannels =
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();

this.credentialProvider = credentialProvider;
this.configuration = configuration;
this.connectionIdKey = connectionIdKey;

this.streamChannels =
IntStream.range(0, this.currentNumStreamGrpcChannels.get())
.mapToObj(
i ->
TopicGrpcConnectionPoolUtils.setupConnection(
credentialProvider, configuration, connectionIdKey))
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
this.streamStubs =
streamChannels.stream()
.map(PubsubGrpc::newStub)
.map(StreamStubWithCount::new)
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
}

// Multiple threads could get to the point of seeing currentNumActiveStreams ==
// currentMaxConcurrentStreams,
// but we need to ensure only one thread will add a new channel at a time so that we don't exceed
// the max number of channels.
private void addNewChannel() {
final int updatedCount = this.currentNumStreamGrpcChannels.incrementAndGet();

if (updatedCount > this.maxStreamGrpcChannels) {
this.currentNumStreamGrpcChannels.decrementAndGet();
return;
}

this.streamChannels.add(
TopicGrpcConnectionPoolUtils.setupConnection(
credentialProvider, configuration, connectionIdKey));
this.streamStubs.add(
new StreamStubWithCount(
PubsubGrpc.newStub(
TopicGrpcConnectionPoolUtils.setupConnection(
credentialProvider, configuration, connectionIdKey))));
}

@Override
public StreamStubWithCount getNextStreamStub() {
// Check if we've reached the current max number of active streams.
if (this.currentNumActiveStreams.get() == this.currentMaxConcurrentStreams) {
// If we have not yet reached the maximum number of channels, add a new channel.
if (this.currentNumStreamGrpcChannels.get() < this.maxStreamGrpcChannels) {
this.addNewChannel();
} else {
// Otherwise return an error because all channels and streams are occupied.
throw new ClientSdkException(
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
"Maximum number of active subscriptions reached");
}
}

// Try to get a client with capacity for another subscription
// by round-robining through the stubs.
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
final int maximumActiveSubscriptions =
this.currentNumStreamGrpcChannels.get()
* GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
for (int i = 0; i < maximumActiveSubscriptions; i++) {
final StreamStubWithCount stubWithCount =
streamStubs.get(index.getAndIncrement() % this.currentNumStreamGrpcChannels.get());
try {
stubWithCount.acquireStubOrThrow();
this.currentNumActiveStreams.incrementAndGet();
return stubWithCount;
} catch (ClientSdkException e) {
// If the stub is at capacity, continue to the next one.
continue;
}
}

// Otherwise return an error if no stubs have capacity.
throw new ClientSdkException(
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
"Maximum number of active subscriptions reached");
}

@Override
public void close() {
streamChannels.forEach(ManagedChannel::shutdown);
}
}
158 changes: 19 additions & 139 deletions momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java
Original file line number Diff line number Diff line change
@@ -1,64 +1,11 @@
package momento.sdk;

import grpc.cache_client.pubsub.PubsubGrpc;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.TopicConfiguration;
import momento.sdk.config.middleware.Middleware;
import momento.sdk.config.middleware.MiddlewareRequestHandlerContext;
import momento.sdk.exceptions.ClientSdkException;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.internal.GrpcChannelOptions;

// Helper class for bookkeeping the number of active concurrent subscriptions.
final class StreamStubWithCount {
private final PubsubGrpc.PubsubStub stub;
private final AtomicInteger count = new AtomicInteger(0);

StreamStubWithCount(PubsubGrpc.PubsubStub stub) {
this.stub = stub;
}

PubsubGrpc.PubsubStub getStub() {
return stub;
}

int getCount() {
return count.get();
}

int incrementCount() {
return count.incrementAndGet();
}

int decrementCount() {
return count.decrementAndGet();
}

void acquireStubOrThrow() throws ClientSdkException {
if (count.incrementAndGet() <= 100) {
return;
} else {
count.decrementAndGet();
throw new ClientSdkException(
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
"Maximum number of active subscriptions reached");
}
}
}

/**
* Manager responsible for GRPC channels and stubs for the Topics.
Expand All @@ -68,110 +15,43 @@ void acquireStubOrThrow() throws ClientSdkException {
* impacting the API business logic.
*/
final class ScsTopicGrpcStubsManager implements Closeable {

private final List<ManagedChannel> unaryChannels;
private final List<PubsubGrpc.PubsubStub> unaryStubs;
private final AtomicInteger unaryIndex = new AtomicInteger(0);

private final List<ManagedChannel> streamChannels;
private final List<StreamStubWithCount> streamStubs;
private final AtomicInteger streamIndex = new AtomicInteger(0);

public static final UUID CONNECTION_ID_KEY = UUID.randomUUID();

private final int numUnaryGrpcChannels;
private final int numStreamGrpcChannels;
private final UnaryTopicGrpcConnectionPool unaryConnectionPool;
private final StreamTopicGrpcConnectionPool streamConnectionPool;

private final TopicConfiguration configuration;
private final Duration deadline;

ScsTopicGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
this.configuration = configuration;
this.deadline = configuration.getTransportStrategy().getGrpcConfiguration().getDeadline();
this.numUnaryGrpcChannels =
configuration.getTransportStrategy().getGrpcConfiguration().getNumUnaryGrpcChannels();
this.numStreamGrpcChannels =
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();

this.unaryChannels =
IntStream.range(0, this.numUnaryGrpcChannels)
.mapToObj(i -> setupConnection(credentialProvider, configuration))
.collect(Collectors.toList());
this.unaryStubs = unaryChannels.stream().map(PubsubGrpc::newStub).collect(Collectors.toList());

this.streamChannels =
IntStream.range(0, this.numStreamGrpcChannels)
.mapToObj(i -> setupConnection(credentialProvider, configuration))
.collect(Collectors.toList());
this.streamStubs =
streamChannels.stream()
.map(PubsubGrpc::newStub)
.map(StreamStubWithCount::new)
.collect(Collectors.toList());
}

private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(
credentialProvider.getCacheEndpoint(), credentialProvider.getPort());
this.unaryConnectionPool =
new StaticUnaryGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder(
configuration.getTransportStrategy().getGrpcConfiguration(),
channelBuilder,
credentialProvider.isEndpointSecure());

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();

final List<Middleware> middlewares = configuration.getMiddlewares();
final MiddlewareRequestHandlerContext context =
() -> Collections.singletonMap(CONNECTION_ID_KEY.toString(), UUID.randomUUID().toString());
clientInterceptors.add(new GrpcMiddlewareInterceptor(middlewares, context));

clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken(), "topic"));
channelBuilder.intercept(clientInterceptors);
return channelBuilder.build();
if (configuration.getIsNumStreamChannelsDynamic()) {
this.streamConnectionPool =
new DynamicStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);
} else {
this.streamConnectionPool =
new StaticStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);
}
}

/** Round-robin publish stub. */
PubsubGrpc.PubsubStub getNextUnaryStub() {
return unaryStubs
.get(unaryIndex.getAndIncrement() % this.numUnaryGrpcChannels)
.withDeadlineAfter(deadline.toMillis(), TimeUnit.MILLISECONDS);
TopicConfiguration getConfiguration() {
return configuration;
}

/** Round-robin subscribe stub. */
StreamStubWithCount getNextStreamStub() {
// Try to get a client with capacity for another subscription
// by round-robining through the stubs.
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100;
for (int i = 0; i < maximumActiveSubscriptions; i++) {
final StreamStubWithCount stubWithCount =
streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels);
try {
stubWithCount.acquireStubOrThrow();
return stubWithCount;
} catch (ClientSdkException e) {
// If the stub is at capacity, continue to the next one.
continue;
}
}

// Otherwise return an error if no stubs have capacity.
throw new ClientSdkException(
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
"Maximum number of active subscriptions reached");
return streamConnectionPool.getNextStreamStub();
}

TopicConfiguration getConfiguration() {
return configuration;
PubsubGrpc.PubsubStub getNextUnaryStub() {
return unaryConnectionPool.getNextUnaryStub();
}

@Override
public void close() {
unaryChannels.forEach(ManagedChannel::shutdown);
streamChannels.forEach(ManagedChannel::shutdown);
unaryConnectionPool.close();
streamConnectionPool.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package momento.sdk;

import grpc.cache_client.pubsub.PubsubGrpc;
import io.grpc.ManagedChannel;
import java.io.Closeable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.TopicConfiguration;
import momento.sdk.exceptions.ClientSdkException;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.internal.GrpcChannelOptions;

class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable {
private final AtomicInteger index = new AtomicInteger(0);
private final int numStreamGrpcChannels;
private final List<ManagedChannel> streamChannels;
private final List<StreamStubWithCount> streamStubs;

public StaticStreamGrpcConnectionPool(
CredentialProvider credentialProvider,
TopicConfiguration configuration,
UUID connectionIdKey) {
this.numStreamGrpcChannels =
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();
this.streamChannels =
IntStream.range(0, this.numStreamGrpcChannels)
.mapToObj(
i ->
TopicGrpcConnectionPoolUtils.setupConnection(
credentialProvider, configuration, connectionIdKey))
.collect(Collectors.toList());
this.streamStubs =
streamChannels.stream()
.map(PubsubGrpc::newStub)
.map(StreamStubWithCount::new)
.collect(Collectors.toList());
}

@Override
public StreamStubWithCount getNextStreamStub() {
// Try to get a client with capacity for another subscription
// by round-robining through the stubs.
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
final int maximumActiveSubscriptions =
this.numStreamGrpcChannels * GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
for (int i = 0; i < maximumActiveSubscriptions; i++) {
final StreamStubWithCount stubWithCount =
streamStubs.get(index.getAndIncrement() % this.numStreamGrpcChannels);
try {
stubWithCount.acquireStubOrThrow();
return stubWithCount;
} catch (ClientSdkException e) {
// If the stub is at capacity, continue to the next one.
continue;
}
}

// Otherwise return an error if no stubs have capacity.
throw new ClientSdkException(
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
"Maximum number of active subscriptions reached");
}

@Override
public void close() {
streamChannels.forEach(ManagedChannel::shutdown);
}
}
Loading
Loading