Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void testSubscribe_shouldNotRetryWithUnrecoverableError() throws Exception {
final int streamErrorMessageLimit = 3;
final MomentoLocalMiddlewareArgs momentoLocalMiddlewareArgs =
new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString())
.streamError(MomentoErrorCode.INTERNAL_SERVER_ERROR)
.streamError(MomentoErrorCode.NOT_FOUND_ERROR)
.streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE))
.streamErrorMessageLimit(streamErrorMessageLimit)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,18 @@
import javax.annotation.Nullable;

public abstract class CancelableClientCallStreamObserver<TResp>
extends ClientCallStreamObserver<TResp> implements ClientResponseObserver<Object, TResp> {
implements ClientResponseObserver<Object, TResp> {

private ClientCallStreamObserver requestStream;
private ClientCallStreamObserver<Object> requestStream;

@Override
public boolean isReady() {
return false;
}

@Override
public void setOnReadyHandler(Runnable onReadyHandler) {}

@Override
public void request(int count) {}

@Override
public void setMessageCompression(boolean enable) {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (requestStream != null) {
requestStream.cancel(message, cause);
}
}

@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
public void beforeStart(ClientCallStreamObserver<Object> requestStream) {
this.requestStream = requestStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ interface IScsTopicConnection {
* <p>Note: This method is intended for testing purposes and should never be called from outside
* of tests.
*/
void close();
default void close() {}

/**
* Opens the connection.
*
* <p>Note: This method is intended for testing purposes and should never be called from outside
* of tests.
*/
void open();
default void open() {}

/**
* Subscribes to a specific topic using the provided subscription request and observer.
Expand Down
76 changes: 24 additions & 52 deletions momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import com.google.protobuf.ByteString;
import grpc.cache_client.pubsub._PublishRequest;
import grpc.cache_client.pubsub._SubscriptionItem;
import grpc.cache_client.pubsub._SubscriptionRequest;
import grpc.cache_client.pubsub._TopicValue;
import grpc.common._Empty;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
Expand All @@ -14,19 +13,19 @@
import momento.sdk.internal.SubscriptionState;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import momento.sdk.retry.SubscriptionRetryStrategy;

public class ScsTopicClient extends ScsClientBase {

private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class);
private final ScsTopicGrpcStubsManager topicGrpcStubsManager;
private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5;
private final SubscriptionRetryStrategy subscriptionRetryStrategy;

public ScsTopicClient(
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
super(null);
this.topicGrpcStubsManager = new ScsTopicGrpcStubsManager(credentialProvider, configuration);
this.subscriptionRetryStrategy = configuration.getSubscriptionRetryStrategy();
}

public CompletableFuture<TopicPublishResponse> publish(
Expand Down Expand Up @@ -60,7 +59,7 @@ public CompletableFuture<TopicPublishResponse> publish(
}

public CompletableFuture<TopicSubscribeResponse> subscribe(
String cacheName, String topicName, ISubscriptionCallbacks options) {
String cacheName, String topicName, ISubscriptionCallbacks callbacks) {
try {
ValidationUtils.checkCacheNameValid(cacheName);
ValidationUtils.checkTopicNameValid(topicName);
Expand All @@ -69,24 +68,7 @@ public CompletableFuture<TopicSubscribeResponse> subscribe(
new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e)));
}

SubscriptionState subscriptionState = new SubscriptionState();
TopicSubscribeResponse.Subscription subscription =
new TopicSubscribeResponse.Subscription(subscriptionState);
SendSubscribeOptions sendSubscribeOptions =
new SendSubscribeOptions(
cacheName,
topicName,
options::onItem,
options::onCompleted,
options::onError,
options::onDiscontinuity,
options::onHeartbeat,
options::onConnectionLost,
options::onConnectionRestored,
subscriptionState,
subscription);

return sendSubscribe(sendSubscribeOptions);
return sendSubscribe(cacheName, topicName, callbacks);
}

private CompletableFuture<TopicPublishResponse> sendPublish(
Expand All @@ -105,10 +87,10 @@ private CompletableFuture<TopicPublishResponse> sendPublish(
.getNextUnaryStub()
.publish(
request,
new StreamObserver() {
new StreamObserver<_Empty>() {

@Override
public void onNext(Object value) {
public void onNext(_Empty value) {
// Do nothing
}

Expand All @@ -133,28 +115,12 @@ public void onCompleted() {
}

private CompletableFuture<TopicSubscribeResponse> sendSubscribe(
SendSubscribeOptions sendSubscribeOptions) {
SubscriptionWrapper subscriptionWrapper;

IScsTopicConnection connection =
new IScsTopicConnection() {
@Override
public void close() {
logger.warn("Closing the connection (for testing purposes only)");
}
String cacheName, String topicName, ISubscriptionCallbacks callbacks) {
final SubscriptionState subscriptionState = new SubscriptionState();

@Override
public void open() {
logger.warn("Opening the connection (for testing purposes only)");
}

@Override
public void subscribe(
_SubscriptionRequest subscriptionRequest,
CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
topicGrpcStubsManager.getNextStreamStub().subscribe(subscriptionRequest, subscription);
}
};
final IScsTopicConnection connection =
(request, subscription) ->
topicGrpcStubsManager.getNextStreamStub().subscribe(request, subscription);

long configuredTimeoutSeconds =
topicGrpcStubsManager
Expand All @@ -166,18 +132,24 @@ public void subscribe(
long firstMessageSubscribeTimeoutSeconds =
configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS;

subscriptionWrapper =
@SuppressWarnings("resource") // the wrapper closes itself when a subscription ends.
final SubscriptionWrapper subscriptionWrapper =
new SubscriptionWrapper(
connection, sendSubscribeOptions, firstMessageSubscribeTimeoutSeconds);
cacheName,
topicName,
connection,
callbacks,
subscriptionState,
firstMessageSubscribeTimeoutSeconds,
subscriptionRetryStrategy);
final CompletableFuture<Void> subscribeFuture = subscriptionWrapper.subscribeWithRetry();
return subscribeFuture.handle(
(v, ex) -> {
if (ex != null) {
return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex));
} else {
sendSubscribeOptions.subscriptionState.setUnsubscribeFn(
subscriptionWrapper::unsubscribe);
return new TopicSubscribeResponse.Subscription(sendSubscribeOptions.subscriptionState);
subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe);
return new TopicSubscribeResponse.Subscription(subscriptionState);
}
});
}
Expand Down
151 changes: 0 additions & 151 deletions momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java

This file was deleted.

Loading
Loading