From 68dcdbb6db4bb36e86b12b767d3a439f53e69ad0 Mon Sep 17 00:00:00 2001 From: Nate Anderson Date: Wed, 19 Mar 2025 10:49:42 -0700 Subject: [PATCH 1/3] chore: improve subscription reconnection logic Move the topic subscription reconnection logic into a retry strategy and eligibility strategy that can be configured by the client. Add the subscription retry strategy to TopicConfiguration. Add new constructors that take the retry strategy, and new ones that don't take a logger, which is only used inside the TopicConfiguration itself. Prevent potential race conditions that could occur when unsubscribing from a connection while it is reconnecting by checking if the user called unsubscribe before reconnecting, and by making the variables used in reconnection atomic. Call close in SubscriptionWrapper when a stream ends to stop a subscription from leaking threads from the retry executor service. We may want to share a thread pool between the subscriptions instead of making a new one per subscription, so that we don't need to make a new one each time. We could also use it to execute the callbacks, which are currently executed in the gRPC thread pool and could cause backpressure if they are long-running. Add empty default implementations for the test methods in IScsTopicConnection to clean up its use in the topic client. Remove SendSubscribeOptions, since we already have a class that contains the callbacks, and because it had an unused subscription object. Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver extension, because that class is meant for observing requests, not responses, and we only need a simple cancel method. Make TopicClientLocalTest's unrecoverable error 'not found', because the previous error is now recoverable. --- .../sdk/retry/TopicClientLocalTest.java | 2 +- .../CancelableClientCallStreamObserver.java | 24 +- .../java/momento/sdk/IScsTopicConnection.java | 4 +- .../main/java/momento/sdk/ScsTopicClient.java | 76 ++--- .../momento/sdk/SendSubscribeOptions.java | 151 ---------- .../java/momento/sdk/SubscriptionWrapper.java | 275 ++++++++++-------- .../sdk/config/TopicConfiguration.java | 75 ++++- ...tSubscriptionRetryEligibilityStrategy.java | 43 +++ .../FixedDelaySubscriptionRetryStrategy.java | 30 ++ .../SubscriptionRetryEligibilityStrategy.java | 27 ++ .../sdk/retry/SubscriptionRetryStrategy.java | 30 ++ .../momento/sdk/SubscriptionWrapperTest.java | 90 +++--- 12 files changed, 433 insertions(+), 394 deletions(-) delete mode 100644 momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java create mode 100644 momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java create mode 100644 momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java create mode 100644 momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java create mode 100644 momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java index 1e906305..3fee9c41 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java @@ -119,7 +119,7 @@ void testSubscribe_shouldNotRetryWithUnrecoverableError() throws Exception { final int streamErrorMessageLimit = 3; final MomentoLocalMiddlewareArgs momentoLocalMiddlewareArgs = new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()) - .streamError(MomentoErrorCode.INTERNAL_SERVER_ERROR) + .streamError(MomentoErrorCode.NOT_FOUND_ERROR) .streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE)) .streamErrorMessageLimit(streamErrorMessageLimit) .build(); diff --git a/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java b/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java index a54aa5ae..fabd3374 100644 --- a/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java +++ b/momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java @@ -5,28 +5,10 @@ import javax.annotation.Nullable; public abstract class CancelableClientCallStreamObserver - extends ClientCallStreamObserver implements ClientResponseObserver { + implements ClientResponseObserver { - private ClientCallStreamObserver requestStream; + private ClientCallStreamObserver requestStream; - @Override - public boolean isReady() { - return false; - } - - @Override - public void setOnReadyHandler(Runnable onReadyHandler) {} - - @Override - public void request(int count) {} - - @Override - public void setMessageCompression(boolean enable) {} - - @Override - public void disableAutoInboundFlowControl() {} - - @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { if (requestStream != null) { requestStream.cancel(message, cause); @@ -34,7 +16,7 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { } @Override - public void beforeStart(ClientCallStreamObserver requestStream) { + public void beforeStart(ClientCallStreamObserver requestStream) { this.requestStream = requestStream; } } diff --git a/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java index 763af473..0a2a202c 100644 --- a/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java +++ b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java @@ -12,7 +12,7 @@ interface IScsTopicConnection { *

Note: This method is intended for testing purposes and should never be called from outside * of tests. */ - void close(); + default void close() {} /** * Opens the connection. @@ -20,7 +20,7 @@ interface IScsTopicConnection { *

Note: This method is intended for testing purposes and should never be called from outside * of tests. */ - void open(); + default void open() {} /** * Subscribes to a specific topic using the provided subscription request and observer. diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index f35b2580..cf32f214 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -2,9 +2,8 @@ import com.google.protobuf.ByteString; import grpc.cache_client.pubsub._PublishRequest; -import grpc.cache_client.pubsub._SubscriptionItem; -import grpc.cache_client.pubsub._SubscriptionRequest; import grpc.cache_client.pubsub._TopicValue; +import grpc.common._Empty; import io.grpc.stub.StreamObserver; import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; @@ -14,19 +13,19 @@ import momento.sdk.internal.SubscriptionState; import momento.sdk.responses.topic.TopicPublishResponse; import momento.sdk.responses.topic.TopicSubscribeResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import momento.sdk.retry.SubscriptionRetryStrategy; public class ScsTopicClient extends ScsClientBase { - private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class); private final ScsTopicGrpcStubsManager topicGrpcStubsManager; private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5; + private final SubscriptionRetryStrategy subscriptionRetryStrategy; public ScsTopicClient( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { super(null); this.topicGrpcStubsManager = new ScsTopicGrpcStubsManager(credentialProvider, configuration); + this.subscriptionRetryStrategy = configuration.getSubscriptionRetryStrategy(); } public CompletableFuture publish( @@ -60,7 +59,7 @@ public CompletableFuture publish( } public CompletableFuture subscribe( - String cacheName, String topicName, ISubscriptionCallbacks options) { + String cacheName, String topicName, ISubscriptionCallbacks callbacks) { try { ValidationUtils.checkCacheNameValid(cacheName); ValidationUtils.checkTopicNameValid(topicName); @@ -69,24 +68,7 @@ public CompletableFuture subscribe( new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); } - SubscriptionState subscriptionState = new SubscriptionState(); - TopicSubscribeResponse.Subscription subscription = - new TopicSubscribeResponse.Subscription(subscriptionState); - SendSubscribeOptions sendSubscribeOptions = - new SendSubscribeOptions( - cacheName, - topicName, - options::onItem, - options::onCompleted, - options::onError, - options::onDiscontinuity, - options::onHeartbeat, - options::onConnectionLost, - options::onConnectionRestored, - subscriptionState, - subscription); - - return sendSubscribe(sendSubscribeOptions); + return sendSubscribe(cacheName, topicName, callbacks); } private CompletableFuture sendPublish( @@ -105,10 +87,10 @@ private CompletableFuture sendPublish( .getNextUnaryStub() .publish( request, - new StreamObserver() { + new StreamObserver<_Empty>() { @Override - public void onNext(Object value) { + public void onNext(_Empty value) { // Do nothing } @@ -133,28 +115,12 @@ public void onCompleted() { } private CompletableFuture sendSubscribe( - SendSubscribeOptions sendSubscribeOptions) { - SubscriptionWrapper subscriptionWrapper; - - IScsTopicConnection connection = - new IScsTopicConnection() { - @Override - public void close() { - logger.warn("Closing the connection (for testing purposes only)"); - } + String cacheName, String topicName, ISubscriptionCallbacks callbacks) { + final SubscriptionState subscriptionState = new SubscriptionState(); - @Override - public void open() { - logger.warn("Opening the connection (for testing purposes only)"); - } - - @Override - public void subscribe( - _SubscriptionRequest subscriptionRequest, - CancelableClientCallStreamObserver<_SubscriptionItem> subscription) { - topicGrpcStubsManager.getNextStreamStub().subscribe(subscriptionRequest, subscription); - } - }; + final IScsTopicConnection connection = + (request, subscription) -> + topicGrpcStubsManager.getNextStreamStub().subscribe(request, subscription); long configuredTimeoutSeconds = topicGrpcStubsManager @@ -166,18 +132,24 @@ public void subscribe( long firstMessageSubscribeTimeoutSeconds = configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS; - subscriptionWrapper = + @SuppressWarnings("resource") // the wrapper closes itself when a subscription ends. + final SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper( - connection, sendSubscribeOptions, firstMessageSubscribeTimeoutSeconds); + cacheName, + topicName, + connection, + callbacks, + subscriptionState, + firstMessageSubscribeTimeoutSeconds, + subscriptionRetryStrategy); final CompletableFuture subscribeFuture = subscriptionWrapper.subscribeWithRetry(); return subscribeFuture.handle( (v, ex) -> { if (ex != null) { return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex)); } else { - sendSubscribeOptions.subscriptionState.setUnsubscribeFn( - subscriptionWrapper::unsubscribe); - return new TopicSubscribeResponse.Subscription(sendSubscribeOptions.subscriptionState); + subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe); + return new TopicSubscribeResponse.Subscription(subscriptionState); } }); } diff --git a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java deleted file mode 100644 index 882de274..00000000 --- a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java +++ /dev/null @@ -1,151 +0,0 @@ -package momento.sdk; - -import momento.sdk.internal.SubscriptionState; -import momento.sdk.responses.topic.TopicDiscontinuity; -import momento.sdk.responses.topic.TopicMessage; -import momento.sdk.responses.topic.TopicSubscribeResponse; - -class SendSubscribeOptions implements ISubscriptionCallbacks { - String cacheName; - String topicName; - ItemCallback onItem; - CompletedCallback onCompleted; - ErrorCallback onError; - DiscontinuityCallback onDiscontinuity; - HeartbeatCallback onHeartbeat; - ConnectionLostCallback onConnectionLost; - ConnectionRestoredCallback onConnectionRestored; - SubscriptionState subscriptionState; - TopicSubscribeResponse.Subscription subscription; - - SendSubscribeOptions( - String cacheName, - String topicName, - ItemCallback onItem, - CompletedCallback onCompleted, - ErrorCallback onError, - DiscontinuityCallback onDiscontinuity, - HeartbeatCallback onHeartbeat, - ConnectionLostCallback onConnectionLost, - ConnectionRestoredCallback onConnectionRestored, - SubscriptionState subscriptionState, - TopicSubscribeResponse.Subscription subscription) { - this.cacheName = cacheName; - this.topicName = topicName; - this.onItem = onItem; - this.onCompleted = onCompleted; - this.onError = onError; - this.onDiscontinuity = onDiscontinuity; - this.onHeartbeat = onHeartbeat; - this.onConnectionLost = onConnectionLost; - this.onConnectionRestored = onConnectionRestored; - this.subscriptionState = subscriptionState; - this.subscription = subscription; - } - - public String getCacheName() { - return cacheName; - } - - public String getTopicName() { - return topicName; - } - - public ItemCallback getOnItem() { - return onItem; - } - - public CompletedCallback getOnCompleted() { - return onCompleted; - } - - public ErrorCallback getOnError() { - return onError; - } - - public DiscontinuityCallback getOnDiscontinuity() { - return onDiscontinuity; - } - - public HeartbeatCallback getOnHeartbeat() { - return onHeartbeat; - } - - public SubscriptionState getSubscriptionState() { - return subscriptionState; - } - - public TopicSubscribeResponse.Subscription getSubscription() { - return subscription; - } - - @Override - public void onItem(TopicMessage message) { - onItem.onItem(message); - } - - @Override - public void onCompleted() { - onCompleted.onCompleted(); - } - - @Override - public void onError(Throwable t) { - onError.onError(t); - } - - @Override - public void onDiscontinuity(TopicDiscontinuity discontinuity) { - onDiscontinuity.onDiscontinuity(discontinuity); - } - - @Override - public void onHeartbeat() { - onHeartbeat.onHeartbeat(); - } - - @Override - public void onConnectionLost() { - onConnectionLost.onConnectionLost(); - } - - @Override - public void onConnectionRestored() { - onConnectionRestored.onConnectionRestored(); - } - - @FunctionalInterface - public interface ItemCallback { - void onItem(TopicMessage message); - } - - @FunctionalInterface - public interface CompletedCallback { - void onCompleted(); - } - - @FunctionalInterface - public interface ErrorCallback { - void onError(Throwable t); - } - - @FunctionalInterface - public interface DiscontinuityCallback { - void onDiscontinuity(TopicDiscontinuity discontinuity); - } - - @FunctionalInterface - public interface HeartbeatCallback { - void onHeartbeat(); - } - - @FunctionalInterface - public interface ConnectionLostCallback { - void onConnectionLost(); - } - - @FunctionalInterface - public interface ConnectionRestoredCallback { - void onConnectionRestored(); - } -} diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index e031ade6..15f74b2e 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -6,65 +6,102 @@ import grpc.cache_client.pubsub._TopicItem; import grpc.cache_client.pubsub._TopicValue; import io.grpc.Status; +import java.io.Closeable; +import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import momento.sdk.exceptions.CacheServiceExceptionMapper; import momento.sdk.exceptions.InternalServerException; import momento.sdk.exceptions.TimeoutException; +import momento.sdk.exceptions.UnknownException; import momento.sdk.internal.MomentoGrpcErrorDetails; import momento.sdk.internal.MomentoTransportErrorDetails; +import momento.sdk.internal.SubscriptionState; import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import momento.sdk.responses.topic.TopicSubscribeResponse; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SubscriptionWrapper implements AutoCloseable { - private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class); - private final IScsTopicConnection connection; - private final SendSubscribeOptions options; +class SubscriptionWrapper implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class); private final long requestTimeoutSeconds; - private boolean firstMessage = true; - private boolean isConnectionLost = false; - + private final IScsTopicConnection connection; + private final String cacheName; + private final String topicName; + private final ISubscriptionCallbacks callbacks; + private final SubscriptionState subscriptionState; + private final SubscriptionRetryStrategy retryStrategy; + private final AtomicBoolean firstMessage = new AtomicBoolean(true); + private final AtomicBoolean isConnectionLost = new AtomicBoolean(false); + private final AtomicBoolean isSubscribed = new AtomicBoolean(true); + + // TODO: share a thread pool across subscription wrappers for retries and potentially for + // callbacks private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private CancelableClientCallStreamObserver<_SubscriptionItem> subscription; - - private volatile CompletableFuture firstMessageTimeoutFuture = null; + private final AtomicReference> + subscription = new AtomicReference<>(); SubscriptionWrapper( - IScsTopicConnection connection, SendSubscribeOptions options, long requestTimeoutSeconds) { + String cacheName, + String topicName, + IScsTopicConnection connection, + ISubscriptionCallbacks callbacks, + SubscriptionState subscriptionState, + long requestTimeoutSeconds, + SubscriptionRetryStrategy retryStrategy) { + this.cacheName = cacheName; + this.topicName = topicName; this.connection = connection; - this.options = options; + this.callbacks = callbacks; + this.subscriptionState = subscriptionState; this.requestTimeoutSeconds = requestTimeoutSeconds; + this.retryStrategy = retryStrategy; } - public CompletableFuture subscribeWithRetry() { - CompletableFuture future = new CompletableFuture<>(); - firstMessageTimeoutFuture = new CompletableFuture<>(); + /** + * This method returns a CompletableFuture that represents the asynchronous execution of the + * internal subscription logic with retry mechanism. + * + * @return A CompletableFuture representing the asynchronous execution of the internal + * subscription logic with retry mechanism. + */ + CompletableFuture subscribeWithRetry() { + final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture firstMessageTimeoutFuture = new CompletableFuture<>(); + + // isSubscribed is true by default and is set to false only when unsubscribe is called. + // Do not allow resubscribe attempt on a subscription that is ending. + if (!isSubscribed.get()) { + completeExceptionally( + future, new UnknownException("Cannot subscribe to an unsubscribed subscription")); + return future; + } scheduler.schedule( () -> { if (!firstMessageTimeoutFuture.isDone()) { logger.warn( - "First message timeout exceeded for topic {} on cache {}", - options.getTopicName(), - options.getCacheName()); + "First message timeout exceeded for topic {} on cache {}", topicName, cacheName); if (subscription != null) { - subscription.cancel("Timed out waiting for first message", null); + subscription.get().cancel("Timed out waiting for first message", null); } firstMessageTimeoutFuture.completeExceptionally( new TimeoutException( new RuntimeException( "Timed out waiting for first message (heartbeat) for topic " - + options.getTopicName() + + topicName + " on cache " - + options.getCacheName()), + + cacheName), new MomentoTransportErrorDetails( new MomentoGrpcErrorDetails( Status.Code.DEADLINE_EXCEEDED, @@ -75,107 +112,60 @@ public CompletableFuture subscribeWithRetry() { requestTimeoutSeconds, TimeUnit.SECONDS); - subscribeWithRetryInternal(future); - - // Combine the subscription future and the first-message timeout future. - // Although CompletableFuture.anyOf(...) returns as soon as *either* future completes, - // it does not tell us *which one* completed, nor does it propagate the actual exception. - // So we explicitly check both futures: - // - // - If the timeout future completed exceptionally, that means the client didn't receive - // the first message (typically a heartbeat) within the expected time, so we want to return - // that timeout error. - // - // - If the timeout didn't fire, but the subscription future failed (e.g., gRPC UNAVAILABLE), - // then we propagate that error. - // - // - If neither completed exceptionally, it means the subscription succeeded, and we return - // success. - // - // This ensures that a timeout error always takes precedence and prevents it from being - // overwritten by a later gRPC error (e.g., UNAVAILABLE) that may arrive after the timeout has - // fired. - CompletableFuture result = new CompletableFuture<>(); - CompletableFuture.anyOf(future, firstMessageTimeoutFuture) - .whenComplete( - (ignored, throwable) -> { - if (firstMessageTimeoutFuture.isCompletedExceptionally()) { - firstMessageTimeoutFuture.whenComplete( - (__, ex) -> result.completeExceptionally(ex)); - } else if (future.isCompletedExceptionally()) { - future.whenComplete((__, ex) -> result.completeExceptionally(ex)); - } else { - result.complete(null); - } - }); - - return result; - } - - private void subscribeWithRetryInternal(CompletableFuture future) { - subscription = + final CancelableClientCallStreamObserver<_SubscriptionItem> observer = new CancelableClientCallStreamObserver<_SubscriptionItem>() { @Override public void onNext(_SubscriptionItem item) { - if (firstMessage) { - firstMessage = false; - if (firstMessageTimeoutFuture != null) { - firstMessageTimeoutFuture.complete(null); - } - + if (firstMessage.compareAndSet(true, false)) { if (item.getKindCase() != _SubscriptionItem.KindCase.HEARTBEAT) { - future.completeExceptionally( + completeExceptionally( + future, new InternalServerException( "Expected heartbeat message for topic " - + options.getTopicName() + + topicName + " on cache " - + options.getCacheName() + + cacheName + ". Got: " + item.getKindCase())); } + if (firstMessageTimeoutFuture != null) { + firstMessageTimeoutFuture.complete(null); + } future.complete(null); return; } - if (isConnectionLost) { - isConnectionLost = false; - options.onConnectionRestored(); + if (isConnectionLost.compareAndSet(true, false)) { + callbacks.onConnectionRestored(); } handleSubscriptionItem(item); } @Override public void onError(Throwable t) { - if (firstMessage) { - firstMessage = false; + if (firstMessage.get()) { if (firstMessageTimeoutFuture != null && !firstMessageTimeoutFuture.isDone()) { firstMessageTimeoutFuture.completeExceptionally(t); } if (!future.isDone()) { future.completeExceptionally(t); } - + // is this needed? + // completeExceptionally(future, t); } else { logger.debug("Subscription failed, retrying..."); - if (!isConnectionLost) { - isConnectionLost = true; - options.onConnectionLost(); + if (isConnectionLost.compareAndSet(false, true)) { + callbacks.onConnectionLost(); } - if (t instanceof io.grpc.StatusRuntimeException) { - logger.debug( - "Throwable is an instance of StatusRuntimeException, checking status code..."); - io.grpc.StatusRuntimeException statusRuntimeException = - (io.grpc.StatusRuntimeException) t; - if (statusRuntimeException.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.info("Status code is UNAVAILABLE, retrying subscription after a delay..."); - scheduleRetry(() -> subscribeWithRetryInternal(future)); + final Optional retryOpt = retryStrategy.determineWhenToRetry(t); + if (retryOpt.isPresent()) { + if (isSubscribed.get()) { + scheduleRetry(retryOpt.get(), () -> subscribeWithRetry()); } else { - logger.debug("Status code is not UNAVAILABLE, not retrying subscription."); - options.onError(t); + logger.debug("Cannot retry an unsubscribed subscription"); } } else { - logger.debug( - "Throwable is not an instance of StatusRuntimeException, not retrying subscription."); - options.onError(t); + callbacks.onError(t); + close(); } } } @@ -186,30 +176,68 @@ public void onCompleted() { } }; - _SubscriptionRequest subscriptionRequest = + final _SubscriptionRequest subscriptionRequest = _SubscriptionRequest.newBuilder() - .setCacheName(options.getCacheName()) - .setTopic(options.getTopicName()) - .setResumeAtTopicSequenceNumber( - options.subscriptionState.getResumeAtTopicSequenceNumber()) - .setSequencePage(options.subscriptionState.getResumeAtTopicSequencePage()) + .setCacheName(cacheName) + .setTopic(topicName) + .setResumeAtTopicSequenceNumber(subscriptionState.getResumeAtTopicSequenceNumber()) + .setSequencePage(subscriptionState.getResumeAtTopicSequencePage()) .build(); try { - connection.subscribe(subscriptionRequest, subscription); - options.subscriptionState.setSubscribed(); + connection.subscribe(subscriptionRequest, observer); + subscriptionState.setSubscribed(); } catch (Exception e) { - future.completeExceptionally( - new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); + completeExceptionally(future, e); } + subscription.set(observer); + + // Combine the subscription future and the first-message timeout future. + // Although CompletableFuture.anyOf(...) returns as soon as *either* future completes, + // it does not tell us *which one* completed, nor does it propagate the actual exception. + // So we explicitly check both futures: + // + // - If the timeout future completed exceptionally, that means the client didn't receive + // the first message (typically a heartbeat) within the expected time, so we want to return + // that timeout error. + // + // - If the timeout didn't fire, but the subscription future failed (e.g., gRPC UNAVAILABLE), + // then we propagate that error. + // + // - If neither completed exceptionally, it means the subscription succeeded, and we return + // success. + // + // This ensures that a timeout error always takes precedence and prevents it from being + // overwritten by a later gRPC error (e.g., UNAVAILABLE) that may arrive after the timeout has + // fired. + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.anyOf(future, firstMessageTimeoutFuture) + .whenComplete( + (ignored, throwable) -> { + if (firstMessageTimeoutFuture.isCompletedExceptionally()) { + firstMessageTimeoutFuture.whenComplete( + (__, ex) -> result.completeExceptionally(ex)); + } else if (future.isCompletedExceptionally()) { + future.whenComplete((__, ex) -> result.completeExceptionally(ex)); + } else { + result.complete(null); + } + }); + return result; + } + + private void completeExceptionally(CompletableFuture future, Throwable t) { + future.completeExceptionally( + new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(t))); + close(); } - private void scheduleRetry(Runnable retryAction) { - scheduler.schedule(retryAction, 500, TimeUnit.MILLISECONDS); + private void scheduleRetry(Duration retryDelay, Runnable retryAction) { + scheduler.schedule(retryAction, retryDelay.toMillis(), TimeUnit.MILLISECONDS); } private void handleSubscriptionCompleted() { - this.options.onCompleted(); + callbacks.onCompleted(); } private void handleSubscriptionItem(_SubscriptionItem item) { @@ -232,16 +260,16 @@ private void handleSubscriptionItem(_SubscriptionItem item) { private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem) { logger.trace( "discontinuity {}, {}, {}, {}, {}", - options.getCacheName(), - options.getTopicName(), + cacheName, + topicName, discontinuityItem.getDiscontinuity().getLastTopicSequence(), discontinuityItem.getDiscontinuity().getNewTopicSequence(), discontinuityItem.getDiscontinuity().getNewSequencePage()); - options.subscriptionState.setResumeAtTopicSequenceNumber( + subscriptionState.setResumeAtTopicSequenceNumber( discontinuityItem.getDiscontinuity().getNewTopicSequence()); - options.subscriptionState.setResumeAtTopicSequencePage( + subscriptionState.setResumeAtTopicSequencePage( discontinuityItem.getDiscontinuity().getNewSequencePage()); - options.onDiscontinuity( + callbacks.onDiscontinuity( new TopicDiscontinuity( discontinuityItem.getDiscontinuity().getLastTopicSequence(), discontinuityItem.getDiscontinuity().getNewTopicSequence(), @@ -249,32 +277,32 @@ private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem } private void handleSubscriptionHeartbeat() { - logger.trace("heartbeat {} {}", options.getCacheName(), options.getTopicName()); - options.onHeartbeat(); + logger.trace("heartbeat {} {}", cacheName, topicName); + callbacks.onHeartbeat(); } private void handleSubscriptionUnknown() { - logger.warn("unknown {} {}", options.getCacheName(), options.getTopicName()); + logger.warn("unknown {} {}", cacheName, topicName); } private void handleSubscriptionItemMessage(_SubscriptionItem item) { _TopicItem topicItem = item.getItem(); _TopicValue topicValue = topicItem.getValue(); - options.subscriptionState.setResumeAtTopicSequenceNumber(topicItem.getTopicSequenceNumber()); - options.subscriptionState.setResumeAtTopicSequencePage(topicItem.getSequencePage()); + subscriptionState.setResumeAtTopicSequenceNumber(topicItem.getTopicSequenceNumber()); + subscriptionState.setResumeAtTopicSequencePage(topicItem.getSequencePage()); TopicMessage message; switch (topicValue.getKindCase()) { case TEXT: message = handleSubscriptionTextMessage(topicValue.getText(), item.getItem().getPublisherId()); - this.options.onItem(message); + callbacks.onItem(message); break; case BINARY: message = handleSubscriptionBinaryMessage( topicValue.getBinary().toByteArray(), item.getItem().getPublisherId()); - this.options.onItem(message); + callbacks.onItem(message); break; default: handleSubscriptionUnknown(); @@ -294,19 +322,16 @@ private TopicMessage.Binary handleSubscriptionBinaryMessage(byte[] binary, Strin } public void unsubscribe() { - subscription.cancel( - "Unsubscribing from topic: " - + options.getTopicName() - + " in cache: " - + options.getCacheName(), - null); + if (isSubscribed.compareAndSet(true, false)) { + subscription + .get() + .cancel("Unsubscribing from topic: " + topicName + " in cache: " + cacheName, null); + close(); + } } @Override public void close() { - if (subscription != null) { - subscription.onCompleted(); - } scheduler.shutdown(); } } diff --git a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java index 26e0d598..57d9e8ee 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java @@ -7,28 +7,68 @@ import momento.sdk.config.middleware.Middleware; import momento.sdk.config.transport.GrpcConfiguration; import momento.sdk.config.transport.TransportStrategy; +import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** The contract for SDK configurables. A configuration must have a transport strategy. */ public class TopicConfiguration { private final TransportStrategy transportStrategy; + private final SubscriptionRetryStrategy subscriptionRetryStrategy; private final List middlewares; private final Logger logger; /** - * Creates a new topic configuration object. + * Creates a new topic configuration. * * @param transportStrategy Responsible for configuring network tunables. + * @param subscriptionRetryStrategy Responsible for determining when to reconnect a broken + * subscription. + * @param middlewares List of middleware that can intercept and modify calls to Momento. * @param logger Responsible for logging */ public TopicConfiguration( - TransportStrategy transportStrategy, List middlewares, Logger logger) { + TransportStrategy transportStrategy, + SubscriptionRetryStrategy subscriptionRetryStrategy, + List middlewares, + Logger logger) { this.transportStrategy = transportStrategy; + this.subscriptionRetryStrategy = subscriptionRetryStrategy; this.middlewares = middlewares; this.logger = logger; } + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + * @param logger Responsible for logging + */ + public TopicConfiguration( + TransportStrategy transportStrategy, List middlewares, Logger logger) { + this( + transportStrategy, + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)), + middlewares, + logger); + } + + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + * @param middlewares List of middleware that can intercept and modify calls to Momento. + */ + public TopicConfiguration(TransportStrategy transportStrategy, List middlewares) { + this( + transportStrategy, + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)), + middlewares, + LoggerFactory.getLogger(TopicConfiguration.class)); + } + /** * Creates a new topic configuration object. * @@ -39,6 +79,37 @@ public TopicConfiguration(TransportStrategy transportStrategy, Logger logger) { this(transportStrategy, new ArrayList<>(), logger); } + /** + * Creates a new topic configuration object. + * + * @param transportStrategy Responsible for configuring network tunables. + */ + public TopicConfiguration(TransportStrategy transportStrategy) { + this(transportStrategy, new ArrayList<>()); + } + + /** + * Configuration for subscription retries. By default, {@link + * momento.sdk.retry.FixedDelaySubscriptionRetryStrategy} gets used. + * + * @return The subscription retry strategy + */ + public SubscriptionRetryStrategy getSubscriptionRetryStrategy() { + return subscriptionRetryStrategy; + } + + /** + * Copy constructor that modifies the subscription retry strategy. + * + * @param subscriptionRetryStrategy The new subscription retry strategy + * @return a new TopicConfiguration with the updated subscription retry strategy + */ + public TopicConfiguration withSubscriptionRetryStrategy( + @Nonnull final SubscriptionRetryStrategy subscriptionRetryStrategy) { + return new TopicConfiguration( + this.transportStrategy, subscriptionRetryStrategy, this.middlewares, this.logger); + } + /** * Configuration for network tunables. * diff --git a/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java new file mode 100644 index 00000000..be1f8937 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java @@ -0,0 +1,43 @@ +package momento.sdk.retry; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultSubscriptionRetryEligibilityStrategy + implements SubscriptionRetryEligibilityStrategy { + private static final Logger logger = + LoggerFactory.getLogger(DefaultSubscriptionRetryEligibilityStrategy.class); + + private static final Set NON_RETRYABLE_STATUS_CODES; + + static { + final Set nonRetryableStatusCodes = new HashSet<>(); + nonRetryableStatusCodes.add(Status.Code.PERMISSION_DENIED); + nonRetryableStatusCodes.add(Status.Code.UNAUTHENTICATED); + nonRetryableStatusCodes.add(Status.Code.CANCELLED); + nonRetryableStatusCodes.add(Status.Code.NOT_FOUND); + + NON_RETRYABLE_STATUS_CODES = Collections.unmodifiableSet(nonRetryableStatusCodes); + } + + @Override + public boolean isEligibleForRetry(Throwable error) { + if (error instanceof StatusRuntimeException) { + final Status.Code statusCode = ((StatusRuntimeException) error).getStatus().getCode(); + if (NON_RETRYABLE_STATUS_CODES.contains(statusCode)) { + logger.debug("Subscription error code {} is not retryable", statusCode); + return false; + } + logger.debug("Subscription error code {} is retryable", statusCode); + return true; + } else { + logger.debug("Unable to retry subscription", error); + return false; + } + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java new file mode 100644 index 00000000..fee5899d --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java @@ -0,0 +1,30 @@ +package momento.sdk.retry; + +import java.time.Duration; +import java.util.Optional; + +/** A topic subscription retry strategy that uses a fixed delay between reconnection attempts. */ +public class FixedDelaySubscriptionRetryStrategy implements SubscriptionRetryStrategy { + + private final SubscriptionRetryEligibilityStrategy eligibilityStrategy; + private final Duration retryDelay; + + public FixedDelaySubscriptionRetryStrategy( + SubscriptionRetryEligibilityStrategy eligibilityStrategy, Duration retryDelay) { + this.eligibilityStrategy = eligibilityStrategy; + this.retryDelay = retryDelay; + } + + public FixedDelaySubscriptionRetryStrategy(Duration retryDelay) { + this(new DefaultSubscriptionRetryEligibilityStrategy(), retryDelay); + } + + @Override + public Optional determineWhenToRetry(Throwable error) { + if (eligibilityStrategy.isEligibleForRetry(error)) { + return Optional.of(retryDelay); + } + + return Optional.empty(); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java new file mode 100644 index 00000000..69a342ad --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java @@ -0,0 +1,27 @@ +package momento.sdk.retry; + +/** + * An interface representing a strategy for determining whether a topic subscription is eligible for + * reconnecting based on the error that disconnected it. + * + *

Implementations of this interface allow clients to customize the criteria for reconnecting a + * failed subscription. {@link #isEligibleForRetry(Throwable)} is called by the + * SubscriptionRetryStrategy to determine whether a specific disconnected subscription is eligible + * for reconnecting. + * + *

A subscription may encounter different types of errors, such as network issues, or server-side + * errors. For example, a simple implementation of this interface could check the status code of the + * gRPC error and return true if the status code represents a transient error (e.g., unavailable), + * or one that is unrecoverable (e.g., cache not found). + */ +public interface SubscriptionRetryEligibilityStrategy { + + /** + * Determines whether a disconnected topic subscription is eligible for reconnection based on the + * error that caused the disconnection. + * + * @param error The error encountered by the subscription when it disconnected. + * @return {@code true} if the request is eligible for reconnection, {@code false} otherwise. + */ + boolean isEligibleForRetry(Throwable error); +} diff --git a/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java new file mode 100644 index 00000000..84d40716 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java @@ -0,0 +1,30 @@ +package momento.sdk.retry; + +import java.time.Duration; +import java.util.Optional; + +/** + * An interface representing a strategy for determining the delay between reconnection attempts for + * a disconnected topic subscription. + * + *

Implementations of this interface allow clients to customize the delay between consecutive + * reconnection attempts when a subscription fails. The {@link #determineWhenToRetry(Throwable)} + * method is called by the subscription to retrieve the delay for the next reconnection attempt + * based on the error encountered when the connection attempt failed. + * + *

If the retry strategy returns an empty optional (Optional.empty()), it indicates that the + * reconnection will not be performed based on the implemented contract. In such cases, the + * subscription will call {@link momento.sdk.ISubscriptionCallbacks#onError(Throwable)} and no + * further reconnection attempts will be made. + */ +public interface SubscriptionRetryStrategy { + + /** + * Returns the amount of time before the next subscription reconnection attempt. + * + * @param error The error encountered by the subscription when it disconnected. + * @return A duration to wait before reconnecting, or empty if no reconnection attempt should be + * made. + */ + Optional determineWhenToRetry(Throwable error); +} diff --git a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java index 3e085358..ceb0e44b 100644 --- a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java +++ b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java @@ -8,14 +8,15 @@ import grpc.cache_client.pubsub._SubscriptionRequest; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import momento.sdk.internal.SubscriptionState; -import momento.sdk.responses.topic.TopicSubscribeResponse; -import org.junit.jupiter.api.BeforeEach; +import momento.sdk.responses.topic.TopicMessage; +import momento.sdk.retry.FixedDelaySubscriptionRetryStrategy; +import momento.sdk.retry.SubscriptionRetryStrategy; import org.junit.jupiter.api.Test; -import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,43 +24,42 @@ public class SubscriptionWrapperTest { private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapperTest.class); private final long requestTimeoutSeconds = 5; - @BeforeEach - public void setUp() { - MockitoAnnotations.openMocks(this); - } - @Test public void testConnectionLostAndRestored() throws InterruptedException { - SubscriptionState state = new SubscriptionState(); - TopicSubscribeResponse.Subscription subscription = - new TopicSubscribeResponse.Subscription(state); - - AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false); - AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false); - - Semaphore waitingForSubscriptionAttempt = new Semaphore(0); - - SendSubscribeOptions options = - new SendSubscribeOptions( - "cache", - "topic", - (message) -> {}, - () -> {}, - (err) -> {}, - (discontinuity) -> {}, - () -> {}, - () -> { - logger.info("Got to our connection lost callback!"); - gotConnectionLostCallback.set(true); - }, - () -> { - logger.info("Got to our connection restored callback!"); - gotConnectionRestoredCallback.set(true); - }, - state, - subscription); + final String cacheName = "cache"; + final String topicName = "topic"; + final SubscriptionState state = new SubscriptionState(); + + final AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false); + final AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false); + + final Semaphore waitingForSubscriptionAttempt = new Semaphore(0); - IScsTopicConnection connection = + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onConnectionLost() { + logger.info("Got to our connection lost callback!"); + gotConnectionLostCallback.set(true); + } + + @Override + public void onConnectionRestored() { + logger.info("Got to our connection restored callback!"); + gotConnectionRestoredCallback.set(true); + } + }; + + final IScsTopicConnection connection = new IScsTopicConnection() { boolean isOpen = true; CancelableClientCallStreamObserver<_SubscriptionItem> subscription; @@ -95,9 +95,19 @@ public void subscribe( } }; - SubscriptionWrapper subscriptionWrapper = - new SubscriptionWrapper(connection, options, requestTimeoutSeconds); - CompletableFuture subscribeWithRetryResult = subscriptionWrapper.subscribeWithRetry(); + final SubscriptionRetryStrategy retryStrategy = + new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)); + final SubscriptionWrapper subscriptionWrapper = + new SubscriptionWrapper( + cacheName, + topicName, + connection, + callbacks, + state, + requestTimeoutSeconds, + retryStrategy); + final CompletableFuture subscribeWithRetryResult = + subscriptionWrapper.subscribeWithRetry(); subscribeWithRetryResult.join(); waitingForSubscriptionAttempt.acquire(); From 332726acf1488386a7ee8593daf103aa48d6f28b Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 30 May 2025 11:53:14 -0700 Subject: [PATCH 2/3] fix: unsubscribe should end with onCompleted instead of onError --- .../java/momento/sdk/SubscriptionWrapper.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 15f74b2e..2cc97b81 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -6,6 +6,8 @@ import grpc.cache_client.pubsub._TopicItem; import grpc.cache_client.pubsub._TopicValue; import io.grpc.Status; +import io.grpc.StatusRuntimeException; + import java.io.Closeable; import java.time.Duration; import java.util.Optional; @@ -149,13 +151,25 @@ public void onError(Throwable t) { if (!future.isDone()) { future.completeExceptionally(t); } - // is this needed? - // completeExceptionally(future, t); + completeExceptionally(future, t); } else { logger.debug("Subscription failed, retrying..."); if (isConnectionLost.compareAndSet(false, true)) { callbacks.onConnectionLost(); } + + // If it was a CANCELLED error because unsubscribe was called, do not retry and + // exit gracefully. + if (t instanceof StatusRuntimeException) { + final StatusRuntimeException exception = (StatusRuntimeException) t; + if (exception.getStatus().getCode() == Status.Code.CANCELLED && exception.getMessage().contains("Unsubscribing")) { + callbacks.onCompleted(); + close(); + return; + } + } + + // Otherwise, determine if we should retry. final Optional retryOpt = retryStrategy.determineWhenToRetry(t); if (retryOpt.isPresent()) { if (isSubscribed.get()) { From 74472c49cf7bcabf2828a0453fb4d3baf86b4cee Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 30 May 2025 11:57:32 -0700 Subject: [PATCH 3/3] fix formatting --- .../src/main/java/momento/sdk/SubscriptionWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 2cc97b81..1906c828 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -7,7 +7,6 @@ import grpc.cache_client.pubsub._TopicValue; import io.grpc.Status; import io.grpc.StatusRuntimeException; - import java.io.Closeable; import java.time.Duration; import java.util.Optional; @@ -162,7 +161,8 @@ public void onError(Throwable t) { // exit gracefully. if (t instanceof StatusRuntimeException) { final StatusRuntimeException exception = (StatusRuntimeException) t; - if (exception.getStatus().getCode() == Status.Code.CANCELLED && exception.getMessage().contains("Unsubscribing")) { + if (exception.getStatus().getCode() == Status.Code.CANCELLED + && exception.getMessage().contains("Unsubscribing")) { callbacks.onCompleted(); close(); return;