diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java index 1e906305..3fee9c41 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java @@ -119,7 +119,7 @@ void testSubscribe_shouldNotRetryWithUnrecoverableError() throws Exception { final int streamErrorMessageLimit = 3; final MomentoLocalMiddlewareArgs momentoLocalMiddlewareArgs = new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()) - .streamError(MomentoErrorCode.INTERNAL_SERVER_ERROR) + .streamError(MomentoErrorCode.NOT_FOUND_ERROR) .streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE)) .streamErrorMessageLimit(streamErrorMessageLimit) .build(); diff --git a/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java b/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java index a54aa5ae..fabd3374 100644 --- a/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java +++ b/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java @@ -5,28 +5,10 @@ import javax.annotation.Nullable; public abstract class CancelableClientCallStreamObserver - extends ClientCallStreamObserver implements ClientResponseObserver { + implements ClientResponseObserver { - private ClientCallStreamObserver requestStream; + private ClientCallStreamObserver requestStream; - @Override - public boolean isReady() { - return false; - } - - @Override - public void setOnReadyHandler(Runnable onReadyHandler) {} - - @Override - public void request(int count) {} - - @Override - public void setMessageCompression(boolean enable) {} - - @Override - public void disableAutoInboundFlowControl() {} - - @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { if (requestStream != null) { requestStream.cancel(message, cause); @@ -34,7 +16,7 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { } @Override - public void beforeStart(ClientCallStreamObserver requestStream) { + public void beforeStart(ClientCallStreamObserver requestStream) { this.requestStream = requestStream; } } diff --git a/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java index 763af473..0a2a202c 100644 --- a/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java +++ b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java @@ -12,7 +12,7 @@ interface IScsTopicConnection { *

Note: This method is intended for testing purposes and should never be called from outside * of tests. */ - void close(); + default void close() {} /** * Opens the connection. @@ -20,7 +20,7 @@ interface IScsTopicConnection { *

Note: This method is intended for testing purposes and should never be called from outside * of tests. */ - void open(); + default void open() {} /** * Subscribes to a specific topic using the provided subscription request and observer. diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index f35b2580..cf32f214 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -2,9 +2,8 @@ import com.google.protobuf.ByteString; import grpc.cache_client.pubsub._PublishRequest; -import grpc.cache_client.pubsub._SubscriptionItem; -import grpc.cache_client.pubsub._SubscriptionRequest; import grpc.cache_client.pubsub._TopicValue; +import grpc.common._Empty; import io.grpc.stub.StreamObserver; import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; @@ -14,19 +13,19 @@ import momento.sdk.internal.SubscriptionState; import momento.sdk.responses.topic.TopicPublishResponse; import momento.sdk.responses.topic.TopicSubscribeResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import momento.sdk.retry.SubscriptionRetryStrategy; public class ScsTopicClient extends ScsClientBase { - private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class); private final ScsTopicGrpcStubsManager topicGrpcStubsManager; private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5; + private final SubscriptionRetryStrategy subscriptionRetryStrategy; public ScsTopicClient( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { super(null); this.topicGrpcStubsManager = new ScsTopicGrpcStubsManager(credentialProvider, configuration); + this.subscriptionRetryStrategy = configuration.getSubscriptionRetryStrategy(); } public CompletableFuture publish( @@ -60,7 +59,7 @@ public CompletableFuture publish( } public CompletableFuture subscribe( - String cacheName, String topicName, ISubscriptionCallbacks options) { + String cacheName, String topicName, ISubscriptionCallbacks callbacks) { try { ValidationUtils.checkCacheNameValid(cacheName); ValidationUtils.checkTopicNameValid(topicName); @@ -69,24 +68,7 @@ public CompletableFuture subscribe( new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); } - SubscriptionState subscriptionState = new SubscriptionState(); - TopicSubscribeResponse.Subscription subscription = - new TopicSubscribeResponse.Subscription(subscriptionState); - SendSubscribeOptions sendSubscribeOptions = - new SendSubscribeOptions( - cacheName, - topicName, - options::onItem, - options::onCompleted, - options::onError, - options::onDiscontinuity, - options::onHeartbeat, - options::onConnectionLost, - options::onConnectionRestored, - subscriptionState, - subscription); - - return sendSubscribe(sendSubscribeOptions); + return sendSubscribe(cacheName, topicName, callbacks); } private CompletableFuture sendPublish( @@ -105,10 +87,10 @@ private CompletableFuture sendPublish( .getNextUnaryStub() .publish( request, - new StreamObserver() { + new StreamObserver<_Empty>() { @Override - public void onNext(Object value) { + public void onNext(_Empty value) { // Do nothing } @@ -133,28 +115,12 @@ public void onCompleted() { } private CompletableFuture sendSubscribe( - SendSubscribeOptions sendSubscribeOptions) { - SubscriptionWrapper subscriptionWrapper; - - IScsTopicConnection connection = - new IScsTopicConnection() { - @Override - public void close() { - logger.warn("Closing the connection (for testing purposes only)"); - } + String cacheName, String topicName, ISubscriptionCallbacks callbacks) { + final SubscriptionState subscriptionState = new SubscriptionState(); - @Override - public void open() { - logger.warn("Opening the connection (for testing purposes only)"); - } - - @Override - public void subscribe( - _SubscriptionRequest subscriptionRequest, - CancelableClientCallStreamObserver<_SubscriptionItem> subscription) { - topicGrpcStubsManager.getNextStreamStub().subscribe(subscriptionRequest, subscription); - } - }; + final IScsTopicConnection connection = + (request, subscription) -> + topicGrpcStubsManager.getNextStreamStub().subscribe(request, subscription); long configuredTimeoutSeconds = topicGrpcStubsManager @@ -166,18 +132,24 @@ public void subscribe( long firstMessageSubscribeTimeoutSeconds = configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS; - subscriptionWrapper = + @SuppressWarnings("resource") // the wrapper closes itself when a subscription ends. + final SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper( - connection, sendSubscribeOptions, firstMessageSubscribeTimeoutSeconds); + cacheName, + topicName, + connection, + callbacks, + subscriptionState, + firstMessageSubscribeTimeoutSeconds, + subscriptionRetryStrategy); final CompletableFuture subscribeFuture = subscriptionWrapper.subscribeWithRetry(); return subscribeFuture.handle( (v, ex) -> { if (ex != null) { return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex)); } else { - sendSubscribeOptions.subscriptionState.setUnsubscribeFn( - subscriptionWrapper::unsubscribe); - return new TopicSubscribeResponse.Subscription(sendSubscribeOptions.subscriptionState); + subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe); + return new TopicSubscribeResponse.Subscription(subscriptionState); } }); } diff --git a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java deleted file mode 100644 index 882de274..00000000 --- a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java +++ /dev/null @@ -1,151 +0,0 @@ -package momento.sdk; - -import momento.sdk.internal.SubscriptionState; -import momento.sdk.responses.topic.TopicDiscontinuity; -import momento.sdk.responses.topic.TopicMessage; -import momento.sdk.responses.topic.TopicSubscribeResponse; - -class SendSubscribeOptions implements ISubscriptionCallbacks { - String cacheName; - String topicName; - ItemCallback onItem; - CompletedCallback onCompleted; - ErrorCallback onError; - DiscontinuityCallback onDiscontinuity; - HeartbeatCallback onHeartbeat; - ConnectionLostCallback onConnectionLost; - ConnectionRestoredCallback onConnectionRestored; - SubscriptionState subscriptionState; - TopicSubscribeResponse.Subscription subscription; - - SendSubscribeOptions( - String cacheName, - String topicName, - ItemCallback onItem, - CompletedCallback onCompleted, - ErrorCallback onError, - DiscontinuityCallback onDiscontinuity, - HeartbeatCallback onHeartbeat, - ConnectionLostCallback onConnectionLost, - ConnectionRestoredCallback onConnectionRestored, - SubscriptionState subscriptionState, - TopicSubscribeResponse.Subscription subscription) { - this.cacheName = cacheName; - this.topicName = topicName; - this.onItem = onItem; - this.onCompleted = onCompleted; - this.onError = onError; - this.onDiscontinuity = onDiscontinuity; - this.onHeartbeat = onHeartbeat; - this.onConnectionLost = onConnectionLost; - this.onConnectionRestored = onConnectionRestored; - this.subscriptionState = subscriptionState; - this.subscription = subscription; - } - - public String getCacheName() { - return cacheName; - } - - public String getTopicName() { - return topicName; - } - - public ItemCallback getOnItem() { - return onItem; - } - - public CompletedCallback getOnCompleted() { - return onCompleted; - } - - public ErrorCallback getOnError() { - return onError; - } - - public DiscontinuityCallback getOnDiscontinuity() { - return onDiscontinuity; - } - - public HeartbeatCallback getOnHeartbeat() { - return onHeartbeat; - } - - public SubscriptionState getSubscriptionState() { - return subscriptionState; - } - - public TopicSubscribeResponse.Subscription getSubscription() { - return subscription; - } - - @Override - public void onItem(TopicMessage message) { - onItem.onItem(message); - } - - @Override - public void onCompleted() { - onCompleted.onCompleted(); - } - - @Override - public void onError(Throwable t) { - onError.onError(t); - } - - @Override - public void onDiscontinuity(TopicDiscontinuity discontinuity) { - onDiscontinuity.onDiscontinuity(discontinuity); - } - - @Override - public void onHeartbeat() { - onHeartbeat.onHeartbeat(); - } - - @Override - public void onConnectionLost() { - onConnectionLost.onConnectionLost(); - } - - @Override - public void onConnectionRestored() { - onConnectionRestored.onConnectionRestored(); - } - - @FunctionalInterface - public interface ItemCallback { - void onItem(TopicMessage message); - } - - @FunctionalInterface - public interface CompletedCallback { - void onCompleted(); - } - - @FunctionalInterface - public interface ErrorCallback { - void onError(Throwable t); - } - - @FunctionalInterface - public interface DiscontinuityCallback { - void onDiscontinuity(TopicDiscontinuity discontinuity); - } - - @FunctionalInterface - public interface HeartbeatCallback { - void onHeartbeat(); - } - - @FunctionalInterface - public interface ConnectionLostCallback { - void onConnectionLost(); - } - - @FunctionalInterface - public interface ConnectionRestoredCallback { - void onConnectionRestored(); - } -} diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index e031ade6..1906c828 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -6,65 +6,103 @@ import grpc.cache_client.pubsub._TopicItem; import grpc.cache_client.pubsub._TopicValue; import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.io.Closeable; +import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import momento.sdk.exceptions.CacheServiceExceptionMapper; import momento.sdk.exceptions.InternalServerException; import momento.sdk.exceptions.TimeoutException; +import momento.sdk.exceptions.UnknownException; import momento.sdk.internal.MomentoGrpcErrorDetails; import momento.sdk.internal.MomentoTransportErrorDetails; +import momento.sdk.internal.SubscriptionState; import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import momento.sdk.responses.topic.TopicSubscribeResponse; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SubscriptionWrapper implements AutoCloseable { - private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class); - private final IScsTopicConnection connection; - private final SendSubscribeOptions options; +class SubscriptionWrapper implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class); private final long requestTimeoutSeconds; - private boolean firstMessage = true; - private boolean isConnectionLost = false; - + private final IScsTopicConnection connection; + private final String cacheName; + private final String topicName; + private final ISubscriptionCallbacks callbacks; + private final SubscriptionState subscriptionState; + private final SubscriptionRetryStrategy retryStrategy; + private final AtomicBoolean firstMessage = new AtomicBoolean(true); + private final AtomicBoolean isConnectionLost = new AtomicBoolean(false); + private final AtomicBoolean isSubscribed = new AtomicBoolean(true); + + // TODO: share a thread pool across subscription wrappers for retries and potentially for + // callbacks private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private CancelableClientCallStreamObserver<_SubscriptionItem> subscription; - - private volatile CompletableFuture firstMessageTimeoutFuture = null; + private final AtomicReference> + subscription = new AtomicReference<>(); SubscriptionWrapper( - IScsTopicConnection connection, SendSubscribeOptions options, long requestTimeoutSeconds) { + String cacheName, + String topicName, + IScsTopicConnection connection, + ISubscriptionCallbacks callbacks, + SubscriptionState subscriptionState, + long requestTimeoutSeconds, + SubscriptionRetryStrategy retryStrategy) { + this.cacheName = cacheName; + this.topicName = topicName; this.connection = connection; - this.options = options; + this.callbacks = callbacks; + this.subscriptionState = subscriptionState; this.requestTimeoutSeconds = requestTimeoutSeconds; + this.retryStrategy = retryStrategy; } - public CompletableFuture subscribeWithRetry() { - CompletableFuture future = new CompletableFuture<>(); - firstMessageTimeoutFuture = new CompletableFuture<>(); + /** + * This method returns a CompletableFuture that represents the asynchronous execution of the + * internal subscription logic with retry mechanism. + * + * @return A CompletableFuture representing the asynchronous execution of the internal + * subscription logic with retry mechanism. + */ + CompletableFuture subscribeWithRetry() { + final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture firstMessageTimeoutFuture = new CompletableFuture<>(); + + // isSubscribed is true by default and is set to false only when unsubscribe is called. + // Do not allow resubscribe attempt on a subscription that is ending. + if (!isSubscribed.get()) { + completeExceptionally( + future, new UnknownException("Cannot subscribe to an unsubscribed subscription")); + return future; + } scheduler.schedule( () -> { if (!firstMessageTimeoutFuture.isDone()) { logger.warn( - "First message timeout exceeded for topic {} on cache {}", - options.getTopicName(), - options.getCacheName()); + "First message timeout exceeded for topic {} on cache {}", topicName, cacheName); if (subscription != null) { - subscription.cancel("Timed out waiting for first message", null); + subscription.get().cancel("Timed out waiting for first message", null); } firstMessageTimeoutFuture.completeExceptionally( new TimeoutException( new RuntimeException( "Timed out waiting for first message (heartbeat) for topic " - + options.getTopicName() + + topicName + " on cache " - + options.getCacheName()), + + cacheName), new MomentoTransportErrorDetails( new MomentoGrpcErrorDetails( Status.Code.DEADLINE_EXCEEDED, @@ -75,107 +113,73 @@ public CompletableFuture subscribeWithRetry() { requestTimeoutSeconds, TimeUnit.SECONDS); - subscribeWithRetryInternal(future); - - // Combine the subscription future and the first-message timeout future. - // Although CompletableFuture.anyOf(...) returns as soon as *either* future completes, - // it does not tell us *which one* completed, nor does it propagate the actual exception. - // So we explicitly check both futures: - // - // - If the timeout future completed exceptionally, that means the client didn't receive - // the first message (typically a heartbeat) within the expected time, so we want to return - // that timeout error. - // - // - If the timeout didn't fire, but the subscription future failed (e.g., gRPC UNAVAILABLE), - // then we propagate that error. - // - // - If neither completed exceptionally, it means the subscription succeeded, and we return - // success. - // - // This ensures that a timeout error always takes precedence and prevents it from being - // overwritten by a later gRPC error (e.g., UNAVAILABLE) that may arrive after the timeout has - // fired. - CompletableFuture result = new CompletableFuture<>(); - CompletableFuture.anyOf(future, firstMessageTimeoutFuture) - .whenComplete( - (ignored, throwable) -> { - if (firstMessageTimeoutFuture.isCompletedExceptionally()) { - firstMessageTimeoutFuture.whenComplete( - (__, ex) -> result.completeExceptionally(ex)); - } else if (future.isCompletedExceptionally()) { - future.whenComplete((__, ex) -> result.completeExceptionally(ex)); - } else { - result.complete(null); - } - }); - - return result; - } - - private void subscribeWithRetryInternal(CompletableFuture future) { - subscription = + final CancelableClientCallStreamObserver<_SubscriptionItem> observer = new CancelableClientCallStreamObserver<_SubscriptionItem>() { @Override public void onNext(_SubscriptionItem item) { - if (firstMessage) { - firstMessage = false; - if (firstMessageTimeoutFuture != null) { - firstMessageTimeoutFuture.complete(null); - } - + if (firstMessage.compareAndSet(true, false)) { if (item.getKindCase() != _SubscriptionItem.KindCase.HEARTBEAT) { - future.completeExceptionally( + completeExceptionally( + future, new InternalServerException( "Expected heartbeat message for topic " - + options.getTopicName() + + topicName + " on cache " - + options.getCacheName() + + cacheName + ". Got: " + item.getKindCase())); } + if (firstMessageTimeoutFuture != null) { + firstMessageTimeoutFuture.complete(null); + } future.complete(null); return; } - if (isConnectionLost) { - isConnectionLost = false; - options.onConnectionRestored(); + if (isConnectionLost.compareAndSet(true, false)) { + callbacks.onConnectionRestored(); } handleSubscriptionItem(item); } @Override public void onError(Throwable t) { - if (firstMessage) { - firstMessage = false; + if (firstMessage.get()) { if (firstMessageTimeoutFuture != null && !firstMessageTimeoutFuture.isDone()) { firstMessageTimeoutFuture.completeExceptionally(t); } if (!future.isDone()) { future.completeExceptionally(t); } - + completeExceptionally(future, t); } else { logger.debug("Subscription failed, retrying..."); - if (!isConnectionLost) { - isConnectionLost = true; - options.onConnectionLost(); + if (isConnectionLost.compareAndSet(false, true)) { + callbacks.onConnectionLost(); + } + + // If it was a CANCELLED error because unsubscribe was called, do not retry and + // exit gracefully. + if (t instanceof StatusRuntimeException) { + final StatusRuntimeException exception = (StatusRuntimeException) t; + if (exception.getStatus().getCode() == Status.Code.CANCELLED + && exception.getMessage().contains("Unsubscribing")) { + callbacks.onCompleted(); + close(); + return; + } } - if (t instanceof io.grpc.StatusRuntimeException) { - logger.debug( - "Throwable is an instance of StatusRuntimeException, checking status code..."); - io.grpc.StatusRuntimeException statusRuntimeException = - (io.grpc.StatusRuntimeException) t; - if (statusRuntimeException.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.info("Status code is UNAVAILABLE, retrying subscription after a delay..."); - scheduleRetry(() -> subscribeWithRetryInternal(future)); + + // Otherwise, determine if we should retry. + final Optional retryOpt = retryStrategy.determineWhenToRetry(t); + if (retryOpt.isPresent()) { + if (isSubscribed.get()) { + scheduleRetry(retryOpt.get(), () -> subscribeWithRetry()); } else { - logger.debug("Status code is not UNAVAILABLE, not retrying subscription."); - options.onError(t); + logger.debug("Cannot retry an unsubscribed subscription"); } } else { - logger.debug( - "Throwable is not an instance of StatusRuntimeException, not retrying subscription."); - options.onError(t); + callbacks.onError(t); + close(); } } } @@ -186,30 +190,68 @@ public void onCompleted() { } }; - _SubscriptionRequest subscriptionRequest = + final _SubscriptionRequest subscriptionRequest = _SubscriptionRequest.newBuilder() - .setCacheName(options.getCacheName()) - .setTopic(options.getTopicName()) - .setResumeAtTopicSequenceNumber( - options.subscriptionState.getResumeAtTopicSequenceNumber()) - .setSequencePage(options.subscriptionState.getResumeAtTopicSequencePage()) + .setCacheName(cacheName) + .setTopic(topicName) + .setResumeAtTopicSequenceNumber(subscriptionState.getResumeAtTopicSequenceNumber()) + .setSequencePage(subscriptionState.getResumeAtTopicSequencePage()) .build(); try { - connection.subscribe(subscriptionRequest, subscription); - options.subscriptionState.setSubscribed(); + connection.subscribe(subscriptionRequest, observer); + subscriptionState.setSubscribed(); } catch (Exception e) { - future.completeExceptionally( - new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); + completeExceptionally(future, e); } + subscription.set(observer); + + // Combine the subscription future and the first-message timeout future. + // Although CompletableFuture.anyOf(...) returns as soon as *either* future completes, + // it does not tell us *which one* completed, nor does it propagate the actual exception. + // So we explicitly check both futures: + // + // - If the timeout future completed exceptionally, that means the client didn't receive + // the first message (typically a heartbeat) within the expected time, so we want to return + // that timeout error. + // + // - If the timeout didn't fire, but the subscription future failed (e.g., gRPC UNAVAILABLE), + // then we propagate that error. + // + // - If neither completed exceptionally, it means the subscription succeeded, and we return + // success. + // + // This ensures that a timeout error always takes precedence and prevents it from being + // overwritten by a later gRPC error (e.g., UNAVAILABLE) that may arrive after the timeout has + // fired. + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.anyOf(future, firstMessageTimeoutFuture) + .whenComplete( + (ignored, throwable) -> { + if (firstMessageTimeoutFuture.isCompletedExceptionally()) { + firstMessageTimeoutFuture.whenComplete( + (__, ex) -> result.completeExceptionally(ex)); + } else if (future.isCompletedExceptionally()) { + future.whenComplete((__, ex) -> result.completeExceptionally(ex)); + } else { + result.complete(null); + } + }); + return result; + } + + private void completeExceptionally(CompletableFuture future, Throwable t) { + future.completeExceptionally( + new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(t))); + close(); } - private void scheduleRetry(Runnable retryAction) { - scheduler.schedule(retryAction, 500, TimeUnit.MILLISECONDS); + private void scheduleRetry(Duration retryDelay, Runnable retryAction) { + scheduler.schedule(retryAction, retryDelay.toMillis(), TimeUnit.MILLISECONDS); } private void handleSubscriptionCompleted() { - this.options.onCompleted(); + callbacks.onCompleted(); } private void handleSubscriptionItem(_SubscriptionItem item) { @@ -232,16 +274,16 @@ private void handleSubscriptionItem(_SubscriptionItem item) { private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem) { logger.trace( "discontinuity {}, {}, {}, {}, {}", - options.getCacheName(), - options.getTopicName(), + cacheName, + topicName, discontinuityItem.getDiscontinuity().getLastTopicSequence(), discontinuityItem.getDiscontinuity().getNewTopicSequence(), discontinuityItem.getDiscontinuity().getNewSequencePage()); - options.subscriptionState.setResumeAtTopicSequenceNumber( + subscriptionState.setResumeAtTopicSequenceNumber( discontinuityItem.getDiscontinuity().getNewTopicSequence()); - options.subscriptionState.setResumeAtTopicSequencePage( + subscriptionState.setResumeAtTopicSequencePage( discontinuityItem.getDiscontinuity().getNewSequencePage()); - options.onDiscontinuity( + callbacks.onDiscontinuity( new TopicDiscontinuity( discontinuityItem.getDiscontinuity().getLastTopicSequence(), discontinuityItem.getDiscontinuity().getNewTopicSequence(), @@ -249,32 +291,32 @@ private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem } private void handleSubscriptionHeartbeat() { - logger.trace("heartbeat {} {}", options.getCacheName(), options.getTopicName()); - options.onHeartbeat(); + logger.trace("heartbeat {} {}", cacheName, topicName); + callbacks.onHeartbeat(); } private void handleSubscriptionUnknown() { - logger.warn("unknown {} {}", options.getCacheName(), options.getTopicName()); + logger.warn("unknown {} {}", cacheName, topicName); } private void handleSubscriptionItemMessage(_SubscriptionItem item) { _TopicItem topicItem = item.getItem(); _TopicValue topicValue = topicItem.getValue(); - options.subscriptionState.setResumeAtTopicSequenceNumber(topicItem.getTopicSequenceNumber()); - options.subscriptionState.setResumeAtTopicSequencePage(topicItem.getSequencePage()); + subscriptionState.setResumeAtTopicSequenceNumber(topicItem.getTopicSequenceNumber()); + subscriptionState.setResumeAtTopicSequencePage(topicItem.getSequencePage()); TopicMessage message; switch (topicValue.getKindCase()) { case TEXT: message = handleSubscriptionTextMessage(topicValue.getText(), item.getItem().getPublisherId()); - this.options.onItem(message); + callbacks.onItem(message); break; case BINARY: message = handleSubscriptionBinaryMessage( topicValue.getBinary().toByteArray(), item.getItem().getPublisherId()); - this.options.onItem(message); + callbacks.onItem(message); break; default: handleSubscriptionUnknown(); @@ -294,19 +336,16 @@ private TopicMessage.Binary handleSubscriptionBinaryMessage(byte[] binary, Strin } public void unsubscribe() { - subscription.cancel( - "Unsubscribing from topic: " - + options.getTopicName() - + " in cache: " - + options.getCacheName(), - null); + if (isSubscribed.compareAndSet(true, false)) { + subscription + .get() + .cancel("Unsubscribing from topic: " + topicName + " in cache: " + cacheName, null); + close(); + } } @Override public void close() { - if (subscription != null) { - subscription.onCompleted(); - } scheduler.shutdown(); } } diff --git a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java index 26e0d598..57d9e8ee 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java @@ -7,28 +7,68 @@ import momento.sdk.config.middleware.Middleware; import momento.sdk.config.transport.GrpcConfiguration; import momento.sdk.config.transport.TransportStrategy; +import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** The contract for SDK configurables. A configuration must have a transport strategy. */ public class TopicConfiguration { private final TransportStrategy transportStrategy; + private final SubscriptionRetryStrategy subscriptionRetryStrategy; private final List middlewares; private final Logger logger; /** - * Creates a new topic configuration object. + * Creates a new topic configuration. * * @param transportStrategy Responsible for configuring network tunables. + * @param subscriptionRetryStrategy Responsible for determining when to reconnect a broken + * subscription. + * @param middlewares List of middleware that can intercept and modify calls to Momento. * @param logger Responsible for logging */ public TopicConfiguration( - TransportStrategy transportStrategy, List middlewares, Logger logger) { + TransportStrategy transportStrategy, + SubscriptionRetryStrategy subscriptionRetryStrategy, + List middlewares, + Logger logger) { this.transportStrategy = transportStrategy; + this.subscriptionRetryStrategy = subscriptionRetryStrategy; this.middlewares = middlewares; this.logger = logger; } + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + * @param logger Responsible for logging + */ + public TopicConfiguration( + TransportStrategy transportStrategy, List middlewares, Logger logger) { + this( + transportStrategy, + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)), + middlewares, + logger); + } + + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + * @param middlewares List of middleware that can intercept and modify calls to Momento. + */ + public TopicConfiguration(TransportStrategy transportStrategy, List middlewares) { + this( + transportStrategy, + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)), + middlewares, + LoggerFactory.getLogger(TopicConfiguration.class)); + } + /** * Creates a new topic configuration object. * @@ -39,6 +79,37 @@ public TopicConfiguration(TransportStrategy transportStrategy, Logger logger) { this(transportStrategy, new ArrayList<>(), logger); } + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + */ + public TopicConfiguration(TransportStrategy transportStrategy) { + this(transportStrategy, new ArrayList<>()); + } + + /** + * Configuration for subscription retries. By default, {@link + * momento.sdk.retry.FixedDelaySubscriptionRetryStrategy} gets used. + * + * @return The subscription retry strategy + */ + public SubscriptionRetryStrategy getSubscriptionRetryStrategy() { + return subscriptionRetryStrategy; + } + + /** + * Copy constructor that modifies the subscription retry strategy. + * + * @param subscriptionRetryStrategy The new subscription retry strategy + * @return a new TopicConfiguration with the updated subscription retry strategy + */ + public TopicConfiguration withSubscriptionRetryStrategy( + @Nonnull final SubscriptionRetryStrategy subscriptionRetryStrategy) { + return new TopicConfiguration( + this.transportStrategy, subscriptionRetryStrategy, this.middlewares, this.logger); + } + /** * Configuration for network tunables. * diff --git a/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java new file mode 100644 index 00000000..be1f8937 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java @@ -0,0 +1,43 @@ +package momento.sdk.retry; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultSubscriptionRetryEligibilityStrategy + implements SubscriptionRetryEligibilityStrategy { + private static final Logger logger = + LoggerFactory.getLogger(DefaultSubscriptionRetryEligibilityStrategy.class); + + private static final Set NON_RETRYABLE_STATUS_CODES; + + static { + final Set nonRetryableStatusCodes = new HashSet<>(); + nonRetryableStatusCodes.add(Status.Code.PERMISSION_DENIED); + nonRetryableStatusCodes.add(Status.Code.UNAUTHENTICATED); + nonRetryableStatusCodes.add(Status.Code.CANCELLED); + nonRetryableStatusCodes.add(Status.Code.NOT_FOUND); + + NON_RETRYABLE_STATUS_CODES = Collections.unmodifiableSet(nonRetryableStatusCodes); + } + + @Override + public boolean isEligibleForRetry(Throwable error) { + if (error instanceof StatusRuntimeException) { + final Status.Code statusCode = ((StatusRuntimeException) error).getStatus().getCode(); + if (NON_RETRYABLE_STATUS_CODES.contains(statusCode)) { + logger.debug("Subscription error code {} is not retryable", statusCode); + return false; + } + logger.debug("Subscription error code {} is retryable", statusCode); + return true; + } else { + logger.debug("Unable to retry subscription", error); + return false; + } + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java new file mode 100644 index 00000000..fee5899d --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java @@ -0,0 +1,30 @@ +package momento.sdk.retry; + +import java.time.Duration; +import java.util.Optional; + +/** A topic subscription retry strategy that uses a fixed delay between reconnection attempts. */ +public class FixedDelaySubscriptionRetryStrategy implements SubscriptionRetryStrategy { + + private final SubscriptionRetryEligibilityStrategy eligibilityStrategy; + private final Duration retryDelay; + + public FixedDelaySubscriptionRetryStrategy( + SubscriptionRetryEligibilityStrategy eligibilityStrategy, Duration retryDelay) { + this.eligibilityStrategy = eligibilityStrategy; + this.retryDelay = retryDelay; + } + + public FixedDelaySubscriptionRetryStrategy(Duration retryDelay) { + this(new DefaultSubscriptionRetryEligibilityStrategy(), retryDelay); + } + + @Override + public Optional determineWhenToRetry(Throwable error) { + if (eligibilityStrategy.isEligibleForRetry(error)) { + return Optional.of(retryDelay); + } + + return Optional.empty(); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java new file mode 100644 index 00000000..69a342ad --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java @@ -0,0 +1,27 @@ +package momento.sdk.retry; + +/** + * An interface representing a strategy for determining whether a topic subscription is eligible for + * reconnecting based on the error that disconnected it. + * + *

Implementations of this interface allow clients to customize the criteria for reconnecting a + * failed subscription. {@link #isEligibleForRetry(Throwable)} is called by the + * SubscriptionRetryStrategy to determine whether a specific disconnected subscription is eligible + * for reconnecting. + * + *

A subscription may encounter different types of errors, such as network issues, or server-side + * errors. For example, a simple implementation of this interface could check the status code of the + * gRPC error and return true if the status code represents a transient error (e.g., unavailable), + * or one that is unrecoverable (e.g., cache not found). + */ +public interface SubscriptionRetryEligibilityStrategy { + + /** + * Determines whether a disconnected topic subscription is eligible for reconnection based on the + * error that caused the disconnection. + * + * @param error The error encountered by the subscription when it disconnected. + * @return {@code true} if the request is eligible for reconnection, {@code false} otherwise. + */ + boolean isEligibleForRetry(Throwable error); +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java new file mode 100644 index 00000000..84d40716 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java @@ -0,0 +1,30 @@ +package momento.sdk.retry; + +import java.time.Duration; +import java.util.Optional; + +/** + * An interface representing a strategy for determining the delay between reconnection attempts for + * a disconnected topic subscription. + * + *

Implementations of this interface allow clients to customize the delay between consecutive + * reconnection attempts when a subscription fails. The {@link #determineWhenToRetry(Throwable)} + * method is called by the subscription to retrieve the delay for the next reconnection attempt + * based on the error encountered when the connection attempt failed. + * + *

If the retry strategy returns an empty optional (Optional.empty()), it indicates that the + * reconnection will not be performed based on the implemented contract. In such cases, the + * subscription will call {@link momento.sdk.ISubscriptionCallbacks#onError(Throwable)} and no + * further reconnection attempts will be made. + */ +public interface SubscriptionRetryStrategy { + + /** + * Returns the amount of time before the next subscription reconnection attempt. + * + * @param error The error encountered by the subscription when it disconnected. + * @return A duration to wait before reconnecting, or empty if no reconnection attempt should be + * made. + */ + Optional determineWhenToRetry(Throwable error); +} diff --git a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java index 3e085358..ceb0e44b 100644 --- a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java +++ b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java @@ -8,14 +8,15 @@ import grpc.cache_client.pubsub._SubscriptionRequest; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import momento.sdk.internal.SubscriptionState; -import momento.sdk.responses.topic.TopicSubscribeResponse; -import org.junit.jupiter.api.BeforeEach; +import momento.sdk.responses.topic.TopicMessage; +import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.junit.jupiter.api.Test; -import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,43 +24,42 @@ public class SubscriptionWrapperTest { private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapperTest.class); private final long requestTimeoutSeconds = 5; - @BeforeEach - public void setUp() { - MockitoAnnotations.openMocks(this); - } - @Test public void testConnectionLostAndRestored() throws InterruptedException { - SubscriptionState state = new SubscriptionState(); - TopicSubscribeResponse.Subscription subscription = - new TopicSubscribeResponse.Subscription(state); - - AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false); - AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false); - - Semaphore waitingForSubscriptionAttempt = new Semaphore(0); - - SendSubscribeOptions options = - new SendSubscribeOptions( - "cache", - "topic", - (message) -> {}, - () -> {}, - (err) -> {}, - (discontinuity) -> {}, - () -> {}, - () -> { - logger.info("Got to our connection lost callback!"); - gotConnectionLostCallback.set(true); - }, - () -> { - logger.info("Got to our connection restored callback!"); - gotConnectionRestoredCallback.set(true); - }, - state, - subscription); + final String cacheName = "cache"; + final String topicName = "topic"; + final SubscriptionState state = new SubscriptionState(); + + final AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false); + final AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false); + + final Semaphore waitingForSubscriptionAttempt = new Semaphore(0); - IScsTopicConnection connection = + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onConnectionLost() { + logger.info("Got to our connection lost callback!"); + gotConnectionLostCallback.set(true); + } + + @Override + public void onConnectionRestored() { + logger.info("Got to our connection restored callback!"); + gotConnectionRestoredCallback.set(true); + } + }; + + final IScsTopicConnection connection = new IScsTopicConnection() { boolean isOpen = true; CancelableClientCallStreamObserver<_SubscriptionItem> subscription; @@ -95,9 +95,19 @@ public void subscribe( } }; - SubscriptionWrapper subscriptionWrapper = - new SubscriptionWrapper(connection, options, requestTimeoutSeconds); - CompletableFuture subscribeWithRetryResult = subscriptionWrapper.subscribeWithRetry(); + final SubscriptionRetryStrategy retryStrategy = + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)); + final SubscriptionWrapper subscriptionWrapper = + new SubscriptionWrapper( + cacheName, + topicName, + connection, + callbacks, + state, + requestTimeoutSeconds, + retryStrategy); + final CompletableFuture subscribeWithRetryResult = + subscriptionWrapper.subscribeWithRetry(); subscribeWithRetryResult.join(); waitingForSubscriptionAttempt.acquire();