ozone.om.admin.protocol.max.retries
20
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 240b6b895319..7e98b1279c03 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -78,6 +78,7 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -228,8 +229,7 @@ public static int getOmRpcPort(ConfigurationSource conf) {
* @param omRequest OMRequest proto
* @return True if its readOnly, false otherwise.
*/
- public static boolean isReadOnly(
- OzoneManagerProtocolProtos.OMRequest omRequest) {
+ public static boolean isReadOnly(OMRequest omRequest) {
OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
switch (cmdType) {
case CheckVolumeAccess:
@@ -352,6 +352,135 @@ public static boolean isReadOnly(
}
}
+ /**
+ * Checks if the OM request should be sent to the follower or leader.
+ *
+ * Note that this method is not equivalent to {@link OmUtils#isReadOnly(OMRequest)}
+ * since there are cases that a "read" requests (ones that do not go through Ratis) requires
+ * to be sent to the leader.
+ * @param omRequest OMRequest proto
+ * @return True if the request should be sent to the follower.
+ */
+ public static boolean shouldSendToFollower(OMRequest omRequest) {
+ OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
+ switch (cmdType) {
+ case CheckVolumeAccess:
+ case InfoVolume:
+ case ListVolume:
+ case InfoBucket:
+ case ListBuckets:
+ case LookupKey:
+ case ListKeys:
+ case ListKeysLight:
+ case ListTrash:
+ // ListTrash is deprecated by HDDS-11251. Keeping this in here
+ // As protobuf currently doesn't support deprecating enum fields
+ // TODO: Remove once migrated to proto3 and mark fields in proto
+ // as deprecated
+ case ListOpenFiles:
+ case ListMultiPartUploadParts:
+ case GetFileStatus:
+ case LookupFile:
+ case ListStatus:
+ case ListStatusLight:
+ case GetAcl:
+ case ListMultipartUploads:
+ case FinalizeUpgradeProgress:
+ case PrepareStatus:
+ case GetS3VolumeContext:
+ case ListTenant:
+ case TenantGetUserInfo:
+ case TenantListUser:
+ case ListSnapshot:
+ case RefetchSecretKey:
+ case GetKeyInfo:
+ case GetSnapshotInfo:
+ case GetObjectTagging:
+ return true;
+ case CreateVolume:
+ case SetVolumeProperty:
+ case DeleteVolume:
+ case CreateBucket:
+ case SetBucketProperty:
+ case DeleteBucket:
+ case CreateKey:
+ case RenameKey:
+ case RenameKeys:
+ case DeleteKey:
+ case DeleteKeys:
+ case CommitKey:
+ case AllocateBlock:
+ case InitiateMultiPartUpload:
+ case CommitMultiPartUpload:
+ case CompleteMultiPartUpload:
+ case AbortMultiPartUpload:
+ case GetS3Secret:
+ case GetDelegationToken:
+ case RenewDelegationToken:
+ case CancelDelegationToken:
+ case CreateDirectory:
+ case CreateFile:
+ case RemoveAcl:
+ case SetAcl:
+ case AddAcl:
+ case PurgeKeys:
+ case RecoverTrash:
+ // RecoverTrash is deprecated by HDDS-11251. Keeping this in here
+ // As protobuf currently doesn't support deprecating enum fields
+ // TODO: Remove once migrated to proto3 and mark fields in proto
+ // as deprecated
+ case FinalizeUpgrade:
+ case Prepare:
+ case CancelPrepare:
+ case DeleteOpenKeys:
+ case SetS3Secret:
+ case RevokeS3Secret:
+ case PurgeDirectories:
+ case PurgePaths:
+ case CreateTenant:
+ case DeleteTenant:
+ case TenantAssignUserAccessId:
+ case TenantRevokeUserAccessId:
+ case TenantAssignAdmin:
+ case TenantRevokeAdmin:
+ case SetRangerServiceVersion:
+ case CreateSnapshot:
+ case DeleteSnapshot:
+ case RenameSnapshot:
+ case SnapshotMoveDeletedKeys:
+ case SnapshotMoveTableKeys:
+ case SnapshotPurge:
+ case RecoverLease:
+ case SetTimes:
+ case AbortExpiredMultiPartUploads:
+ case SetSnapshotProperty:
+ case QuotaRepair:
+ case PutObjectTagging:
+ case DeleteObjectTagging:
+ case ServiceList: // OM leader should have the most up-to-date OM service list info
+ case RangerBGSync: // Ranger Background Sync task is only run on leader
+ case SnapshotDiff:
+ case CancelSnapshotDiff:
+ case ListSnapshotDiffJobs:
+ case PrintCompactionLogDag:
+ // Snapshot diff is a local to a single OM node so we should not send it arbitrarily
+ // to any OM nodes
+ case TransferLeadership: // Transfer leadership should be initiated by the leader
+ case SetSafeMode: // SafeMode should be initiated by the leader
+ case StartQuotaRepair:
+ case GetQuotaRepairStatus:
+ // Quota repair lifecycle request should be initiated by the leader
+ case DBUpdates: // We are currently only interested on the leader DB info
+ case UnknownCommand:
+ return false;
+ case EchoRPC:
+ return omRequest.getEchoRPCRequest().getReadOnly();
+ default:
+ LOG.error("CmdType {} is not categorized to be sent to follower.", cmdType);
+ return false;
+ }
+ }
+
public static byte[] getSHADigest() throws IOException {
try {
SRAND.nextBytes(randomBytes);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index 7d5e40e9f8d9..4a3a4fc47154 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -45,7 +45,7 @@
public class HadoopRpcOMFailoverProxyProvider extends
OMFailoverProxyProviderBase {
- private static final Logger LOG =
+ protected static final Logger LOG =
LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
private final Text delegationTokenService;
@@ -123,7 +123,7 @@ public synchronized ProxyInfo getProxy() {
/**
* Creates proxy object.
*/
- protected ProxyInfo createOMProxyIfNeeded(OMProxyInfo omProxyInfo) {
+ protected synchronized ProxyInfo createOMProxyIfNeeded(OMProxyInfo omProxyInfo) {
if (omProxyInfo.proxy == null) {
try {
omProxyInfo.proxy = createOMProxy(omProxyInfo.getAddress());
@@ -136,6 +136,11 @@ protected ProxyInfo createOMProxyIfNeeded(OMProxyInfo omProxyInfo) {
return omProxyInfo;
}
+ protected synchronized ProxyInfo createOMProxyIfNeeded(String omNodeId) {
+ OMProxyInfo omProxyInfo = getOmProxy(omNodeId);
+ return createOMProxyIfNeeded(omProxyInfo);
+ }
+
public Text getCurrentProxyDelegationToken() {
return delegationTokenService;
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
new file mode 100644
index 000000000000..0bab0a2129c7
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getLeaderNotReadyException;
+import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getNotLeaderException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc_.Client.ConnectionId;
+import org.apache.hadoop.ipc_.RPC;
+import org.apache.hadoop.ipc_.RpcInvocationHandler;
+import org.apache.hadoop.ipc_.RpcNoSuchProtocolException;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
+ * that supports reading from follower OM(s) (i.e. non-leader OMs also includes
+ * OM listeners).
+ *
+ * This constructs a wrapper proxy might send the read request to follower
+ * OM(s), if follower read is enabled. It will try to send read requests
+ * to the first OM node. If RPC failed, it will try to failover to the next OM node.
+ * It will fail back to the leader OM after it has exhausted all the OMs.
+ * TODO: Currently the logic does not prioritize forwarding to followers since
+ * it requires an extra RPC latency to check the OM role info.
+ * In the future, we can try to try to pick the followers before forwarding
+ * the request to the leader (similar to ObserverReadProxyProvider).
+ *
+ * Read and write requests will still be sent to leader OM if reading from
+ * follower is disabled.
+ */
+public class HadoopRpcOMFollowerReadFailoverProxyProvider implements FailoverProxyProvider {
+ @VisibleForTesting
+ public static final Logger LOG = LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
+
+ private final Class protocolClass;
+
+ /** The inner proxy provider used for leader-based failover. */
+ private final HadoopRpcOMFailoverProxyProvider failoverProxy;
+
+ /** The combined proxy which redirects to other proxies as necessary. */
+ private final ProxyInfo combinedProxy;
+
+ /**
+ * Whether reading from follower is enabled. If this is false, all read
+ * requests will still go to OM leader.
+ */
+ private volatile boolean followerReadEnabled;
+
+ /**
+ * The current index of the underlying leader-based proxy provider's omNodesInOrder currently being used.
+ * Should only be accessed in synchronized methods.
+ */
+ private int currentIndex = -1;
+
+ /**
+ * The proxy currently being used to send the read request.
+ * Should only be accessed in synchronized methods.
+ */
+ private OMProxyInfo currentProxy;
+
+ /** The last proxy that has been used. Only used for testing. */
+ private volatile OMProxyInfo lastProxy = null;
+
+ public HadoopRpcOMFollowerReadFailoverProxyProvider(
+ ConfigurationSource configuration, UserGroupInformation ugi, String omServiceId, Class protocol)
+ throws IOException {
+ this(omServiceId, protocol,
+ new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
+ }
+
+ @SuppressWarnings("unchecked")
+ public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class protocol,
+ HadoopRpcOMFailoverProxyProvider failoverProxy) throws IOException {
+ this.protocolClass = protocol;
+ this.failoverProxy = failoverProxy;
+
+ // Create a wrapped proxy containing all the proxies. Since this combined
+ // proxy is just redirecting to other proxies, all invocations can share it.
+ StringBuilder combinedInfo = new StringBuilder("[");
+ for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) {
+ if (i > 0) {
+ combinedInfo.append(',');
+ }
+ combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
+ }
+ combinedInfo.append(']');
+ T wrappedProxy = (T) Proxy.newProxyInstance(
+ FollowerReadInvocationHandler.class.getClassLoader(),
+ new Class>[] {protocol}, new FollowerReadInvocationHandler());
+ combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+
+ if (wrappedProxy instanceof OzoneManagerProtocolPB) {
+ this.followerReadEnabled = true;
+ } else {
+ LOG.debug("Disabling follower reads for {} because the requested proxy "
+ + "class does not implement {}", omServiceId, OzoneManagerProtocolPB.class.getName());
+ this.followerReadEnabled = false;
+ }
+ }
+
+ @Override
+ public Class getInterface() {
+ return protocolClass;
+ }
+
+ @Override
+ public ProxyInfo getProxy() {
+ return combinedProxy;
+ }
+
+ @Override
+ public void performFailover(T currProxy) {
+ // Since FollowerReadInvocationHandler might user or fallback to leader-based failover logic,
+ // we should delegate the failover logic to the leader's failover.
+ failoverProxy.performFailover(currProxy);
+ }
+
+ public RetryPolicy getRetryPolicy(int maxFailovers) {
+ // We use the OMFailoverProxyProviderBase's RetryPolicy instead of using our own retry policy
+ // for a few reasons
+ // 1. We want to ensure that the retry policy behavior remains the same when we use the leader proxy
+ // (when follower read is disabled or using write request)
+ // 2. The FollowerInvocationHandler is also written so that the thrown exception is handled by the
+ // OMFailoverProxyProviderbase's RetryPolicy
+ return failoverProxy.getRetryPolicy(maxFailovers);
+ }
+
+ /**
+ * Parse the OM request from the request args.
+ *
+ * @return parsed OM request.
+ */
+ private static OMRequest parseOMRequest(Object[] args) throws Throwable {
+ if (args == null || args.length < 2 || !(args[1] instanceof Message)) {
+ LOG.error("Request failed since OM request is null and cannot be parsed");
+ // Throws a non-retriable exception to prevent retry and failover
+ // See the HddsUtils#shouldNotFailoverOnRpcException used in
+ // OMFailoverProxyProviderBase#shouldFailover
+ throw wrapInServiceException(
+ new RpcNoSuchProtocolException("OM request is null and cannot be parsed"));
+ }
+ final Message theRequest = (Message) args[1];
+ return (OMRequest) theRequest;
+ }
+
+ @VisibleForTesting
+ void setFollowerReadEnabled(boolean flag) {
+ this.followerReadEnabled = flag;
+ }
+
+ @VisibleForTesting
+ public ProxyInfo getLastProxy() {
+ return lastProxy;
+ }
+
+ /**
+ * Return the currently used proxy. If there is none, first calls
+ * {@link #changeProxy(OMProxyInfo)} to initialize one.
+ */
+ @VisibleForTesting
+ public OMProxyInfo getCurrentProxy() {
+ return changeProxy(null);
+ }
+
+ /**
+ * Move to the next proxy in the proxy list. If the OMProxyInfo supplied by
+ * the caller does not match the current proxy, the call is ignored; this is
+ * to handle concurrent calls (to avoid changing the proxy multiple times).
+ * The service state of the newly selected proxy will be updated before
+ * returning.
+ *
+ * @param initial The expected current proxy
+ * @return The new proxy that should be used.
+ */
+ private synchronized OMProxyInfo changeProxy(OMProxyInfo initial) {
+ if (currentProxy != initial) {
+ // Must have been a concurrent modification; ignore the move request
+ return currentProxy;
+ }
+ currentIndex = (currentIndex + 1) % failoverProxy.getOmNodesInOrder().size();
+ String currentOmNodeId = failoverProxy.getOmNodesInOrder().get(currentIndex);
+ currentProxy = (OMProxyInfo) failoverProxy.createOMProxyIfNeeded(currentOmNodeId);
+ LOG.debug("Changed current proxy from {} to {}",
+ initial == null ? "none" : initial.proxyInfo,
+ currentProxy.proxyInfo);
+ return currentProxy;
+ }
+
+ /**
+ * An InvocationHandler to handle incoming requests. This class's invoke
+ * method contains the primary logic for redirecting to followers.
+ *
+ * If follower reads are enabled, attempt to send read operations to the
+ * current proxy which can be either a leader or follower. If the current
+ * proxy's OM node fails, adjust the current proxy and return on the next one.
+ *
+ * Write requests are always forwarded to the leader.
+ */
+ private class FollowerReadInvocationHandler implements RpcInvocationHandler {
+
+ @Override
+ public Object invoke(Object proxy, final Method method, final Object[] args)
+ throws Throwable {
+ lastProxy = null;
+ if (method.getDeclaringClass() == Object.class) {
+ // If the method is not a OzoneManagerProtocolPB method (e.g. Object#toString()),
+ // we should invoke the method on the current proxy
+ return method.invoke(this, args);
+ }
+ Object retVal;
+ OMRequest omRequest = parseOMRequest(args);
+ if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) {
+ int failedCount = 0;
+ for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) {
+ OMProxyInfo current = getCurrentProxy();
+ LOG.debug("Attempting to service {} with cmdType {} using proxy {}",
+ method.getName(), omRequest.getCmdType(), current.proxyInfo);
+ try {
+ retVal = method.invoke(current.proxy, args);
+ lastProxy = current;
+ LOG.debug("Invocation of {} with cmdType {} using {} was successful",
+ method.getName(), omRequest.getCmdType(), current.proxyInfo);
+ return retVal;
+ } catch (InvocationTargetException ite) {
+ LOG.debug("Invocation of {} with cmdType {} using proxy {} failed", method.getName(),
+ omRequest.getCmdType(), current.proxyInfo, ite);
+ if (!(ite.getCause() instanceof Exception)) {
+ throw wrapInServiceException(ite.getCause());
+ }
+ Exception e = (Exception) ite.getCause();
+ if (e instanceof InterruptedIOException ||
+ e instanceof InterruptedException) {
+ // If interrupted, do not retry.
+ LOG.warn("Invocation returned interrupted exception on [{}];",
+ current.proxyInfo, e);
+ throw wrapInServiceException(e);
+ }
+
+ if (e instanceof ServiceException) {
+ OMNotLeaderException notLeaderException =
+ getNotLeaderException(e);
+ if (notLeaderException != null) {
+ // We should disable follower read here since this means
+ // the OM follower does not support / disable follower read or something is misconfigured
+ LOG.debug("Encountered OMNotLeaderException from {}. " +
+ "Disable OM follower read and retry OM leader directly.", current.proxyInfo);
+ followerReadEnabled = false;
+ // Break here instead of throwing exception so that it is not counted
+ // as a failover
+ break;
+ }
+
+ OMLeaderNotReadyException leaderNotReadyException =
+ getLeaderNotReadyException(e);
+ if (leaderNotReadyException != null) {
+ LOG.debug("Encountered OMLeaderNotReadyException from {}. " +
+ "Directly throw the exception to trigger retry", current.proxyInfo);
+ // Throw here to trigger retry since we already communicate to the leader
+ // If we break here instead, we will retry the same leader again without waiting
+ throw e;
+ }
+ }
+
+ if (!failoverProxy.shouldFailover(e)) {
+ // We reuse the leader proxy provider failover since we want to ensure
+ // if the follower read proxy decides that the exception should be failed,
+ // the leader proxy provider failover retry policy (i.e. OMFailoverProxyProviderBase#getRetryPolicy)
+ // should also fail the call.
+ // Otherwise, if the follower read proxy decides the exception should be failed, but
+ // the leader decides to failover to the its next proxy, the follower read proxy remains
+ // unchanged and the next read calls might query the same failing OM node and
+ // fail indefinitely.
+ LOG.debug("Invocation with cmdType {} returned exception on [{}] that cannot be retried; " +
+ "{} failure(s) so far",
+ omRequest.getCmdType(), current.proxyInfo, failedCount, e);
+ throw e;
+ } else {
+ failedCount++;
+ LOG.warn(
+ "Invocation with cmdType {} returned exception on [{}]; {} failure(s) so far",
+ omRequest.getCmdType(), current.proxyInfo, failedCount, e);
+ changeProxy(current);
+ }
+ }
+ }
+
+ // Only log message if there are actual follower failures.
+ // Getting here with failedCount = 0 could
+ // be that there is simply no Follower node running at all.
+ if (failedCount > 0) {
+ // If we get here, it means all followers have failed.
+ LOG.warn("{} nodes have failed for read request {} with cmdType {}."
+ + " Falling back to leader.", failedCount,
+ omRequest.getCmdType(), method.getName());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read falling back to leader without follower read "
+ + "fail, is there no follower node running?");
+ }
+ }
+ }
+
+ // Either all followers have failed, follower reads are disabled,
+ // or this is a write request. In any case, forward the request to
+ // the leader OM.
+ LOG.debug("Using leader-based failoverProxy to service {}", method.getName());
+ OMProxyInfo leaderProxy = (OMProxyInfo) failoverProxy.getProxy();
+ try {
+ retVal = method.invoke(leaderProxy.proxy, args);
+ } catch (InvocationTargetException e) {
+ LOG.debug("Exception thrown from leader-based failoverProxy", e.getCause());
+ // This exception will be handled by the OMFailoverProxyProviderBase#getRetryPolicy
+ // (see getRetryPolicy). This ensures that the leader-only failover should still work.
+ throw wrapInServiceException(e.getCause());
+ }
+ lastProxy = leaderProxy;
+ return retVal;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public ConnectionId getConnectionId() {
+ return RPC.getConnectionIdForProxy(followerReadEnabled
+ ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ // All the proxies are stored in the underlying failoverProxy
+ // so we invoke close on the underlying failoverProxy
+ failoverProxy.close();
+ }
+
+ @VisibleForTesting
+ public boolean isFollowerReadEnabled() {
+ return followerReadEnabled;
+ }
+
+ @VisibleForTesting
+ public List> getOMProxies() {
+ return failoverProxy.getOMProxies();
+ }
+
+ public synchronized void changeInitialProxyForTest(String initialOmNodeId) {
+ if (currentProxy != null && currentProxy.getNodeId().equals(initialOmNodeId)) {
+ return;
+ }
+
+ int indexOfTargetNodeId = failoverProxy.getOmNodesInOrder().indexOf(initialOmNodeId);
+ if (indexOfTargetNodeId == -1) {
+ return;
+ }
+
+ currentIndex = indexOfTargetNodeId;
+ currentProxy = (OMProxyInfo) failoverProxy.createOMProxyIfNeeded(initialOmNodeId);
+ }
+
+ /**
+ * Wrap the throwable in {@link ServiceException} if necessary.
+ * This is required to prevent {@link java.lang.reflect.UndeclaredThrowableException} to be thrown
+ * since {@link OzoneManagerProtocolPB#submitRequest(RpcController, OMRequest)} only
+ * throws {@link ServiceException}.
+ * @param e exception to wrap in {@link ServiceException}.
+ * @return if the throwable is already an instance {@link ServiceException} simply returns the exception itself.
+ * Otherwise, return the exception wrapped in {@link ServiceException}
+ */
+ private static Throwable wrapInServiceException(Throwable e) {
+ if (e instanceof ServiceException) {
+ return e;
+ }
+ return new ServiceException(e);
+ }
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index 04cc40a6b0f2..739999d39f80 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -416,6 +416,10 @@ public Map> getOMProxyMap() {
return omProxies;
}
+ protected synchronized OMProxyInfo getOmProxy(String nodeId) {
+ return omProxies.get(nodeId);
+ }
+
/**
* Unwrap the exception and return the wrapped OMLeaderNotReadyException if any.
*
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 60dd34ead2ad..860222b46a24 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.security.UserGroupInformation;
@@ -45,7 +46,8 @@ public class Hadoop3OmTransport implements OmTransport {
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
- private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
@@ -56,14 +58,34 @@ public Hadoop3OmTransport(ConfigurationSource conf,
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
+ boolean followerReadEnabled = conf.getBoolean(
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
+ );
+
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
- this.rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
+ // TODO: In the future, we might support more FollowerReadProxyProvider strategies depending on factors
+ // like latency, applied index, etc.
+ // So instead of enabling using follower read configuration, we can simply let user to configure the
+ // failover proxy provider instead (similar to dfs.client.failover.proxy.provider.)
+ if (followerReadEnabled) {
+ this.followerReadFailoverProxyProvider = new HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+ omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
+ );
+ this.rpcProxy = createRetryProxy(followerReadFailoverProxyProvider, maxFailovers);
+ } else {
+ // TODO: It should be possible to simply instantiate HadoopRpcOMFollowerReadFailoverProxyProvider
+ // even if the follower read is not enabled. We can try this to ensure that the tests still pass which
+ // suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a indeed a superset of
+ // HadoopRpcOMFollowerReadFailoverProxyProvider
+ this.rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
+ }
}
@Override
@@ -103,13 +125,24 @@ public Text getDelegationTokenService() {
* is not the leader OM.
*/
private OzoneManagerProtocolPB createRetryProxy(
- HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
+ HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
int maxFailovers) {
-
- OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
+ return (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider,
failoverProxyProvider.getRetryPolicy(maxFailovers));
- return proxy;
+ }
+
+ /**
+ * Creates a {@link RetryProxy} encapsulating the
+ * {@link HadoopRpcOMFollowerReadFailoverProxyProvider}.
+ */
+ private OzoneManagerProtocolPB createRetryProxy(
+ HadoopRpcOMFollowerReadFailoverProxyProvider failoverProxyProvider,
+ int maxFailovers) {
+ return (OzoneManagerProtocolPB) RetryProxy.create(
+ OzoneManagerProtocolPB.class, failoverProxyProvider,
+ failoverProxyProvider.getRetryPolicy(maxFailovers)
+ );
}
@VisibleForTesting
@@ -117,8 +150,17 @@ public HadoopRpcOMFailoverProxyProvider getOmFailoverProxyProvider() {
return omFailoverProxyProvider;
}
+ @VisibleForTesting
+ public HadoopRpcOMFollowerReadFailoverProxyProvider getOmFollowerReadFailoverProxyProvider() {
+ return followerReadFailoverProxyProvider;
+ }
+
@Override
public void close() throws IOException {
- omFailoverProxyProvider.close();
+ if (followerReadFailoverProxyProvider != null) {
+ followerReadFailoverProxyProvider.close();
+ } else {
+ omFailoverProxyProvider.close();
+ }
}
}
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 9aea06fd7969..b61bf3303168 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -41,10 +41,16 @@
import java.nio.file.Path;
import java.util.Set;
import java.util.TreeSet;
+import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.LoggerFactory;
/**
* Unit tests for {@link OmUtils}.
@@ -326,4 +332,32 @@ void testGetObjectIdFromTxIdValidation() {
// Consistency checks between epoch and txId are covered by
// testAddEpochToTxId() and testGetObjectIdFromTxId().
+
+ /**
+ * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
+ * categorized in {@link OmUtils#shouldSendToFollower(OMRequest)}.
+ */
+ @Test
+ public void testShouldSendToFollowerFollowerReadCapturesAllCmdTypeEnums() {
+ GenericTestUtils.LogCapturer logCapturer =
+ GenericTestUtils.LogCapturer.captureLogs(
+ LoggerFactory.getLogger(OmUtils.class));
+ OzoneManagerProtocolProtos.Type[] cmdTypes =
+ OzoneManagerProtocolProtos.Type.values();
+ String clientId = UUID.randomUUID().toString();
+
+ for (OzoneManagerProtocolProtos.Type cmdType : cmdTypes) {
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(cmdType)
+ .setClientId(clientId)
+ .build();
+ OmUtils.shouldSendToFollower(request);
+ Assertions.assertFalse(
+ logCapturer.getOutput().contains(
+ "CmdType " + cmdType + " is not categorized to be sent to follower."
+ )
+ );
+ logCapturer.clearOutput();
+ }
+ }
}
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
new file mode 100644
index 000000000000..33f97ba9dd6c
--- /dev/null
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc_.RemoteException;
+import org.apache.hadoop.ipc_.RpcNoSuchProtocolException;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetKeyInfoRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for {@link HadoopRpcOMFollowerReadFailoverProxyProvider}.
+ */
+public class TestHadoopRpcOMFollowerReadFailoverProxyProvider {
+ private static final long SLOW_RESPONSE_SLEEP_TIME = TimeUnit.SECONDS.toMillis(2);
+ private static final String OM_SERVICE_ID = "om-service-test1";
+ private static final String NODE_ID_BASE_STR = "omNode-";
+ private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
+ private OzoneConfiguration conf;
+
+ private HadoopRpcOMFollowerReadFailoverProxyProvider proxyProvider;
+ private OzoneManagerProtocolPB retryProxy;
+ private String[] omNodeIds;
+ private OMAnswer[] omNodeAnswers;
+
+ @Test
+ public void testWithNonClientProxy() throws Exception {
+ setupProxyProvider(2);
+ HadoopRpcOMFollowerReadFailoverProxyProvider adminProxyProvider =
+ new HadoopRpcOMFollowerReadFailoverProxyProvider<>(conf,
+ UserGroupInformation.getCurrentUser(), OM_SERVICE_ID, OMAdminProtocolPB.class);
+ // follower read is only enabled for OzoneManagerProtocolPB and disabled otherwise
+ assertFalse(adminProxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testWriteOperationOnLeader() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[2].isLeader = true;
+
+ doWrite();
+
+ assertHandledBy(2);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ // Although the write request is forwarded to the leader,
+ // the follower read proxy provider should still point to first OM follower
+ assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+ }
+
+ @Test
+ void testWriteOperationOnLeaderNotReady() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].isLeader = true;
+ omNodeAnswers[0].isLeaderReady = false;
+
+ new Thread(() -> {
+ try {
+ Thread.sleep(1000);
+ omNodeAnswers[0].isLeaderReady = true;
+ } catch (InterruptedException ignored) {
+ }
+ }).start();
+
+ long start = Time.monotonicNow();
+ doWrite();
+ long elapsed = Time.monotonicNow() - start;
+
+ assertTrue(elapsed > 1000,
+ "Write operation finished earlier than expected");
+
+ assertHandledBy(0);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testWriteLeaderFailover() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].isLeader = true;
+
+ doWrite();
+ assertHandledBy(0);
+ // Read current proxy remain unchanged
+ assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+
+
+ // Leader failover from omNode-1 to omNode-2
+ omNodeAnswers[0].isLeader = false;
+ omNodeAnswers[1].isLeader = true;
+ doWrite();
+ assertHandledBy(1);
+ assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+
+ // Leader failover back from omNode-2 to omNode-1
+ omNodeAnswers[0].isLeader = true;
+ omNodeAnswers[1].isLeader = false;
+ doWrite();
+ assertHandledBy(0);
+ assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+ }
+
+ @Test
+ void testReadOperationOnFollower() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].isLeader = false;
+
+ doRead();
+
+ assertHandledBy(0);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testReadOperationOnLeader() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].isLeader = true;
+
+ doRead();
+
+ // Follower read can still read from OM leader
+ assertHandledBy(0);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testReadOperationOnLeaderNotReady() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].isLeader = true;
+ omNodeAnswers[0].isLeaderReady = false;
+
+ new Thread(() -> {
+ try {
+ Thread.sleep(1000);
+ omNodeAnswers[0].isLeaderReady = true;
+ } catch (InterruptedException ignored) {
+ }
+ }).start();
+
+ long start = Time.monotonicNow();
+ doRead();
+ long elapsed = Time.monotonicNow() - start;
+
+ assertTrue(elapsed > 1000,
+ "Read operation finished earlier than expected");
+
+ assertHandledBy(0);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testReadOperationOnFollowerWhenFollowerReadUnsupported() throws Exception {
+ setupProxyProvider(3);
+ // Disable all follower reads from all OM nodes
+ for (OMAnswer omAnswer : omNodeAnswers) {
+ omAnswer.isFollowerReadSupported = false;
+ }
+ omNodeAnswers[1].isLeader = true;
+
+ doRead();
+ // The read request will be handled by the leader
+ assertHandledBy(1);
+ // Since OMNotLeaderException is thrown during follower read, the
+ // proxy will keep sending reads from the leader from now on
+ assertFalse(proxyProvider.isFollowerReadEnabled());
+
+ // Try to simulate leader change
+ omNodeAnswers[1].isLeader = false;
+ omNodeAnswers[2].isLeader = true;
+
+ doRead();
+ assertHandledBy(2);
+
+ assertFalse(proxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testUnreachableFollowers() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[2].isLeader = true;
+ // Mark the first follower as unreachable
+ omNodeAnswers[0].unreachable = true;
+
+ // It will be handled by the second follower since the first follower is
+ // unreachable
+ doRead();
+ assertHandledBy(1);
+
+ // Now make the second follower as unavailable
+ // All followers are unreachable now
+ omNodeAnswers[1].unreachable = true;
+
+ // Confirm that read still succeeds even though followers are not available
+ doRead();
+ // It will be handled by the leader
+ assertHandledBy(2);
+ }
+
+ @Test
+ void testReadOnSlowFollower() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].slowNode = true;
+
+ long start = Time.monotonicNow();
+ doRead();
+ long elapsed = Time.monotonicNow() - start;
+ assertHandledBy(0);
+ assertThat(elapsed)
+ .withFailMessage(() -> "Read operation finished earlier than expected")
+ .isGreaterThanOrEqualTo(SLOW_RESPONSE_SLEEP_TIME);
+
+ }
+
+ @Test
+ void testMixedWriteAndRead() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[1].isLeader = true;
+
+ doWrite();
+
+ // Write is handled by the leader
+ assertHandledBy(1);
+
+ doRead();
+
+ // Read is handled by the first follower
+ assertHandledBy(0);
+ }
+
+ @Test
+ void testWriteWithAllOMsUnreachable() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].unreachable = true;
+ omNodeAnswers[1].unreachable = true;
+ omNodeAnswers[2].unreachable = true;
+
+ ServiceException exception = assertThrows(ServiceException.class, this::doWrite);
+ assertInstanceOf(IOException.class, exception.getCause());
+ }
+
+ @Test
+ void testReadWithAllOMsUnreachable() throws Exception {
+ setupProxyProvider(3);
+ omNodeAnswers[0].unreachable = true;
+ omNodeAnswers[1].unreachable = true;
+ omNodeAnswers[2].unreachable = true;
+
+ ServiceException exception = assertThrows(ServiceException.class, this::doRead);
+ assertInstanceOf(IOException.class, exception.getCause());
+ }
+
+ @Test
+ void testObjectMethodsOnProxy() throws Exception {
+ setupProxyProvider(2);
+
+ assertNotNull(retryProxy.toString());
+ retryProxy.hashCode();
+ retryProxy.equals(retryProxy);
+ }
+
+ @Test
+ void testObjectMethodsDoNotSelectProxy() throws Exception {
+ setupProxyProvider(2);
+
+ assertNull(proxyProvider.getLastProxy());
+ }
+
+ @Test
+ void testShortArgsArrayDoesNotThrowArrayIndex() throws Exception {
+ setupProxyProvider(2);
+
+ Object combinedProxy = proxyProvider.getProxy().proxy;
+ InvocationHandler handler = Proxy.getInvocationHandler(combinedProxy);
+ Method submitRequest = OzoneManagerProtocolPB.class.getMethod(
+ "submitRequest", RpcController.class, OMRequest.class);
+
+ ServiceException exception = assertThrows(ServiceException.class,
+ () -> handler.invoke(combinedProxy, submitRequest, new Object[] {null}));
+ assertInstanceOf(RpcNoSuchProtocolException.class, exception.getCause());
+ }
+
+ @Test
+ void testNullRequest() throws Exception {
+ setupProxyProvider(2);
+ ServiceException exception = assertThrows(ServiceException.class,
+ () -> retryProxy.submitRequest(null, null));
+ assertInstanceOf(RpcNoSuchProtocolException.class, exception.getCause());
+ }
+
+ private void setupProxyProvider(int omNodeCount) throws Exception {
+ setupProxyProvider(omNodeCount, new OzoneConfiguration());
+ }
+
+ private void setupProxyProvider(int omNodeCount, OzoneConfiguration config) throws Exception {
+ omNodeIds = new String[omNodeCount];
+ omNodeAnswers = new OMAnswer[omNodeCount];
+ StringJoiner allNodeIds = new StringJoiner(",");
+ OzoneManagerProtocolPB[] proxies = new OzoneManagerProtocolPB[omNodeCount];
+ // Setup each OM node with the mocked proxy
+ Map proxyMap = new HashMap<>();
+ for (int i = 0; i < omNodeCount; i++) {
+ String nodeId = NODE_ID_BASE_STR + (i + 1); // 1-th indexed
+ config.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
+ nodeId), DUMMY_NODE_ADDR);
+ allNodeIds.add(nodeId);
+ omNodeIds[i] = nodeId;
+ omNodeAnswers[i] = new OMAnswer();
+ proxies[i] = mock(OzoneManagerProtocolPB.class);
+ doAnswer(omNodeAnswers[i].clientAnswer)
+ .when(proxies[i]).submitRequest(any(), any());
+ doAnswer(omNodeAnswers[i].clientAnswer)
+ .when(proxies[i]).submitRequest(any(), any());
+ proxyMap.put(nodeId, proxies[i]);
+ }
+ config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+ allNodeIds.toString());
+ config.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID);
+ config.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 2 * omNodeCount);
+ config.setLong(
+ OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 500);
+
+ // Create a leader-based failover proxy provider using the mocked proxies
+ HadoopRpcOMFailoverProxyProvider underlyingProxyProvider =
+ new HadoopRpcOMFailoverProxyProvider(config, UserGroupInformation.getCurrentUser(),
+ OM_SERVICE_ID, OzoneManagerProtocolPB.class) {
+ @Override
+ protected synchronized ProxyInfo createOMProxyIfNeeded(
+ OMProxyInfo omProxyInfo) {
+ if (omProxyInfo.proxy == null) {
+ omProxyInfo.proxy = proxyMap.get(omProxyInfo.getNodeId());
+ }
+ return omProxyInfo;
+ }
+
+ @Override
+ protected void initOmProxiesFromConfigs(ConfigurationSource config, String omSvcId) throws IOException {
+ Map> omProxies = new HashMap<>();
+
+ List omNodeIDList = new ArrayList<>();
+
+ Collection activeOmNodeIds = OmUtils.getActiveOMNodeIds(config,
+ omSvcId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(activeOmNodeIds)) {
+
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omSvcId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
+
+ // ProxyInfo.proxy will be set during first time call to server.
+ OMProxyInfo omProxyInfo = new OMProxyInfo<>(omSvcId, nodeId, rpcAddrStr);
+
+ if (omProxyInfo.getAddress() != null) {
+ // For a non-HA OM setup, nodeId might be null. If so, we assign it
+ // the default value
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+ }
+ omProxies.put(nodeId, omProxyInfo);
+ omNodeIDList.add(nodeId);
+ } else {
+ LOG.error("Failed to create OM proxy for {} at address {}",
+ nodeId, rpcAddrStr);
+ }
+ }
+
+ if (omProxies.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+ // By default, the omNodesInOrder is shuffled to reduce hotspot, but we can sort it here to
+ // make it easier to test
+ Collections.sort(omNodeIDList);
+ initOmProxies(omProxies, omNodeIDList);
+ }
+ };
+
+ // Wrap the leader-based failover proxy provider with follower read proxy provider
+ proxyProvider = new HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+ OM_SERVICE_ID, OzoneManagerProtocolPB.class, underlyingProxyProvider);
+ assertTrue(proxyProvider.isFollowerReadEnabled());
+ // Wrap the follower read proxy provider in retry proxy to allow automatic failover
+ retryProxy = (OzoneManagerProtocolPB) RetryProxy.create(
+ OzoneManagerProtocolPB.class, proxyProvider,
+ proxyProvider.getRetryPolicy(2 * omNodeCount)
+ );
+ // This is currently added to prevent IllegalStateException in
+ // Client#setCallIdAndRetryCount since it seems that callId is set but not unset properly
+ RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
+ conf = config;
+ }
+
+ private void doRead() throws Exception {
+ doRead(retryProxy);
+ }
+
+ private void doWrite() throws Exception {
+ doWrite(retryProxy);
+ }
+
+ private static void doWrite(OzoneManagerProtocolPB client) throws Exception {
+ CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
+ KeyArgs keyArgs = KeyArgs.newBuilder()
+ .setVolumeName("volume")
+ .setBucketName("bucket")
+ .setKeyName("key")
+ .build();
+ req.setKeyArgs(keyArgs);
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(ClientId.randomId().toString())
+ .setCmdType(Type.CreateKey)
+ .setCreateKeyRequest(req)
+ .build();
+
+ client.submitRequest(null, omRequest);
+ }
+
+ private static void doRead(OzoneManagerProtocolPB client) throws Exception {
+ KeyArgs keyArgs = KeyArgs.newBuilder()
+ .setVolumeName("volume")
+ .setBucketName("bucket")
+ .setKeyName("key")
+ .build();
+ GetKeyInfoRequest.Builder req = GetKeyInfoRequest.newBuilder()
+ .setKeyArgs(keyArgs);
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(ClientId.randomId().toString())
+ .setCmdType(Type.GetKeyInfo)
+ .setGetKeyInfoRequest(req)
+ .build();
+
+ client.submitRequest(null, omRequest);
+ }
+
+ private void assertHandledBy(int omNodeIdx) {
+ OMProxyInfo lastProxy =
+ (OMProxyInfo) proxyProvider.getLastProxy();
+ assertEquals(omNodeIds[omNodeIdx], lastProxy.getNodeId());
+ }
+
+ private static class OMAnswer {
+
+ private volatile boolean unreachable = false;
+ private volatile boolean slowNode = false;
+
+ private volatile boolean isLeader = false;
+ private volatile boolean isLeaderReady = true;
+ private volatile boolean isFollowerReadSupported = true;
+
+ private OMProtocolAnswer clientAnswer = new OMProtocolAnswer();
+
+ private class OMProtocolAnswer implements Answer {
+ @Override
+ public OMResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (unreachable) {
+ throw new IOException("Unavailable");
+ }
+
+ // sleep to simulate slow rpc responses.
+ if (slowNode) {
+ Thread.sleep(SLOW_RESPONSE_SLEEP_TIME);
+ }
+ OMRequest omRequest = invocationOnMock.getArgument(1);
+ switch (omRequest.getCmdType()) {
+ case CreateKey:
+ if (!isLeader) {
+ throw new ServiceException(
+ new RemoteException(
+ OMNotLeaderException.class.getCanonicalName(),
+ "Write can only be done on leader"
+ )
+ );
+ }
+ if (isLeader && !isLeaderReady) {
+ throw new ServiceException(
+ new RemoteException(
+ OMLeaderNotReadyException.class.getCanonicalName(),
+ "Leader is not ready yet"
+ )
+ );
+ }
+ break;
+ case GetKeyInfo:
+ if (!isLeader && !isFollowerReadSupported) {
+ throw new ServiceException(
+ new RemoteException(
+ OMNotLeaderException.class.getCanonicalName(),
+ "OM follower read is not supported"
+ )
+ );
+ }
+ if (isLeader && !isLeaderReady) {
+ throw new ServiceException(
+ new RemoteException(
+ OMLeaderNotReadyException.class.getCanonicalName(),
+ "Leader is not ready yet"
+ )
+ );
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported cmdType");
+ }
+ return null;
+ }
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
index d494d82c885d..731acb29028d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.ozone.om;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -39,10 +39,21 @@ public static HadoopRpcOMFailoverProxyProvider getFailoverProxyProvider(
OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
(OzoneManagerProtocolClientSideTranslatorPB)
- ((RpcClient) clientProtocol).getOzoneManagerClient();
+ clientProtocol.getOzoneManagerClient();
Hadoop3OmTransport transport =
(Hadoop3OmTransport) ozoneManagerClient.getTransport();
return transport.getOmFailoverProxyProvider();
}
+
+ public static HadoopRpcOMFollowerReadFailoverProxyProvider getFollowerReadFailoverProxyProvider(
+ ClientProtocol clientProtocol) {
+ OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
+ (OzoneManagerProtocolClientSideTranslatorPB)
+ clientProtocol.getOzoneManagerClient();
+
+ Hadoop3OmTransport transport =
+ (Hadoop3OmTransport) ozoneManagerClient.getTransport();
+ return transport.getOmFollowerReadFailoverProxyProvider();
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java
new file mode 100644
index 000000000000..965efd9dca1e
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc_.RemoteException;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public abstract class TestOzoneManagerHAFollowerRead {
+
+ private static MiniOzoneHAClusterImpl cluster = null;
+ private static ObjectStore objectStore;
+ private static OzoneConfiguration conf;
+ private static String omServiceId;
+ private static int numOfOMs = 3;
+ private static final int LOG_PURGE_GAP = 50;
+ /* Reduce max number of retries to speed up unit test. */
+ private static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS = 5;
+ private static final int IPC_CLIENT_CONNECT_MAX_RETRIES = 4;
+ private static final long SNAPSHOT_THRESHOLD = 50;
+ private static final Duration RETRY_CACHE_DURATION = Duration.ofSeconds(30);
+ private static OzoneClient client;
+
+ public MiniOzoneHAClusterImpl getCluster() {
+ return cluster;
+ }
+
+ public ObjectStore getObjectStore() {
+ return objectStore;
+ }
+
+ public static OzoneClient getClient() {
+ return client;
+ }
+
+ public OzoneConfiguration getConf() {
+ return conf;
+ }
+
+ public String getOmServiceId() {
+ return omServiceId;
+ }
+
+ public static int getLogPurgeGap() {
+ return LOG_PURGE_GAP;
+ }
+
+ public static long getSnapshotThreshold() {
+ return SNAPSHOT_THRESHOLD;
+ }
+
+ public static int getNumOfOMs() {
+ return numOfOMs;
+ }
+
+ public static int getOzoneClientFailoverMaxAttempts() {
+ return OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS;
+ }
+
+ public static Duration getRetryCacheDuration() {
+ return RETRY_CACHE_DURATION;
+ }
+
+ @BeforeAll
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ omServiceId = "om-service-test1";
+ conf.setBoolean(OZONE_ACL_ENABLED, true);
+ conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
+ OZONE_ADMINISTRATORS_WILDCARD);
+ conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS);
+ conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ IPC_CLIENT_CONNECT_MAX_RETRIES);
+ /* Reduce IPC retry interval to speed up unit test. */
+ conf.setInt(IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 200);
+ conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+ conf.setLong(
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+ SNAPSHOT_THRESHOLD);
+ // Enable filesystem snapshot feature for the test regardless of the default
+ conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+
+ // Some subclasses check RocksDB directly as part of their tests. These
+ // depend on OBS layout.
+ conf.set(OZONE_DEFAULT_BUCKET_LAYOUT,
+ OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE);
+
+ OzoneManagerRatisServerConfig omHAConfig =
+ conf.getObject(OzoneManagerRatisServerConfig.class);
+
+ omHAConfig.setRetryCacheTimeout(RETRY_CACHE_DURATION);
+
+ // Enable the OM follower read
+ omHAConfig.setReadOption("LINEARIZABLE");
+ omHAConfig.setReadLeaderLeaseEnabled(true);
+
+ conf.setFromObject(omHAConfig);
+
+ // config for key deleting service.
+ conf.set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, "10s");
+ conf.set(OZONE_KEY_DELETING_LIMIT_PER_TASK, "2");
+
+ MiniOzoneHAClusterImpl.Builder clusterBuilder = MiniOzoneCluster.newHABuilder(conf)
+ .setOMServiceId(omServiceId)
+ .setNumOfOzoneManagers(numOfOMs);
+
+ cluster = clusterBuilder.build();
+ cluster.waitForClusterToBeReady();
+
+ OzoneConfiguration clientConf = OzoneConfiguration.of(conf);
+ clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+ client = OzoneClientFactory.getRpcClient(omServiceId, clientConf);
+ objectStore = client.getObjectStore();
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ IOUtils.closeQuietly(client);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Create a key in the bucket.
+ *
+ * @return the key name.
+ */
+ public static String createKey(OzoneBucket ozoneBucket) throws IOException {
+ String keyName = "key" + RandomStringUtils.secure().nextNumeric(5);
+ createKey(ozoneBucket, keyName);
+ return keyName;
+ }
+
+ public static void createKey(OzoneBucket ozoneBucket, String keyName) throws IOException {
+ String data = "data" + RandomStringUtils.secure().nextNumeric(5);
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, data.length(), ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length());
+ ozoneOutputStream.close();
+ }
+
+ public static String createPrefixName() {
+ return "prefix" + RandomStringUtils.secure().nextNumeric(5) + OZONE_URI_DELIMITER;
+ }
+
+ public static void createPrefix(OzoneObj prefixObj) throws IOException {
+ assertTrue(objectStore.setAcl(prefixObj, Collections.emptyList()));
+ }
+
+ protected OzoneBucket setupBucket() throws Exception {
+ String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+ String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+ String volumeName = "volume" + UUID.randomUUID();
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ assertEquals(volumeName, retVolumeinfo.getName());
+ assertEquals(userName, retVolumeinfo.getOwner());
+ assertEquals(adminName, retVolumeinfo.getAdmin());
+
+ String bucketName = UUID.randomUUID().toString();
+ retVolumeinfo.createBucket(bucketName);
+
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ assertEquals(bucketName, ozoneBucket.getName());
+ assertEquals(volumeName, ozoneBucket.getVolumeName());
+
+ return ozoneBucket;
+ }
+
+ protected OzoneBucket linkBucket(OzoneBucket srcBuk) throws Exception {
+ String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+ String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+ String linkedVolName = "volume-link-" + RandomStringUtils.secure().nextNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ BucketArgs createBucketArgs = new BucketArgs.Builder()
+ .setSourceVolume(srcBuk.getVolumeName())
+ .setSourceBucket(srcBuk.getName())
+ .build();
+
+ objectStore.createVolume(linkedVolName, createVolumeArgs);
+ OzoneVolume linkedVolumeInfo = objectStore.getVolume(linkedVolName);
+
+ assertEquals(linkedVolName, linkedVolumeInfo.getName());
+ assertEquals(userName, linkedVolumeInfo.getOwner());
+ assertEquals(adminName, linkedVolumeInfo.getAdmin());
+
+ String linkedBucketName = UUID.randomUUID().toString();
+ linkedVolumeInfo.createBucket(linkedBucketName, createBucketArgs);
+
+ OzoneBucket linkedBucket = linkedVolumeInfo.getBucket(linkedBucketName);
+
+ assertEquals(linkedBucketName, linkedBucket.getName());
+ assertEquals(linkedVolName, linkedBucket.getVolumeName());
+ assertTrue(linkedBucket.isLink());
+
+ return linkedBucket;
+ }
+
+ /**
+ * Stop the current leader OM.
+ */
+ protected void stopLeaderOM() {
+ //Stop the leader OM.
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil.getFailoverProxyProvider(
+ (RpcClient) objectStore.getClientProxy());
+
+ // The omFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Stop one of the ozone manager, to see when the OM leader changes
+ // multipart upload is happening successfully or not.
+ cluster.stopOzoneManager(leaderOMNodeId);
+ }
+
+ /**
+ * Create a volume and test its attribute.
+ */
+ protected void createVolumeTest(boolean checkSuccess) throws Exception {
+ String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+ String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ try {
+ objectStore.createVolume(volumeName, createVolumeArgs);
+
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ if (checkSuccess) {
+ assertEquals(volumeName, retVolumeinfo.getName());
+ assertEquals(userName, retVolumeinfo.getOwner());
+ assertEquals(adminName, retVolumeinfo.getAdmin());
+ } else {
+ // Verify that the request failed
+ fail("There is no quorum. Request should have failed");
+ }
+ } catch (IOException e) {
+ if (!checkSuccess) {
+ // If the last OM to be tried by the RetryProxy is down, we would get
+ // ConnectException. Otherwise, we would get a RemoteException from the
+ // last running OM as it would fail to get a quorum.
+ if (e instanceof RemoteException) {
+ assertThat(e).hasMessageContaining("is not the leader");
+ } else if (e instanceof ConnectException) {
+ assertThat(e).hasMessageContaining("Connection refused");
+ } else {
+ assertThat(e).hasMessageContaining("Could not determine or connect to OM Leader");
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * This method createFile and verifies the file is successfully created or
+ * not.
+ *
+ * @param ozoneBucket
+ * @param keyName
+ * @param data
+ * @param recursive
+ * @param overwrite
+ * @throws Exception
+ */
+ protected void testCreateFile(OzoneBucket ozoneBucket, String keyName,
+ String data, boolean recursive,
+ boolean overwrite)
+ throws Exception {
+
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
+ data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+ overwrite, recursive);
+
+ ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length());
+ ozoneOutputStream.close();
+
+ OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
+
+ assertEquals(keyName, ozoneKeyDetails.getName());
+ assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
+ assertEquals(ozoneBucket.getVolumeName(),
+ ozoneKeyDetails.getVolumeName());
+ assertEquals(data.length(), ozoneKeyDetails.getDataSize());
+ assertTrue(ozoneKeyDetails.isFile());
+
+ try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+ byte[] fileContent = new byte[data.getBytes(UTF_8).length];
+ IOUtils.readFully(ozoneInputStream, fileContent);
+ assertEquals(data, new String(fileContent, UTF_8));
+ }
+
+ Iterator extends OzoneKey> iterator = ozoneBucket.listKeys("/");
+ while (iterator.hasNext()) {
+ OzoneKey ozoneKey = iterator.next();
+ if (!ozoneKey.getName().endsWith(OM_KEY_PREFIX)) {
+ assertTrue(ozoneKey.isFile());
+ } else {
+ assertFalse(ozoneKey.isFile());
+ }
+ }
+ }
+
+ protected void createKeyTest(boolean checkSuccess) throws Exception {
+ String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+ String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ try {
+ getObjectStore().createVolume(volumeName, createVolumeArgs);
+
+ OzoneVolume retVolumeinfo = getObjectStore().getVolume(volumeName);
+
+ assertEquals(volumeName, retVolumeinfo.getName());
+ assertEquals(userName, retVolumeinfo.getOwner());
+ assertEquals(adminName, retVolumeinfo.getAdmin());
+
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+ retVolumeinfo.createBucket(bucketName);
+
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ assertEquals(bucketName, ozoneBucket.getName());
+ assertEquals(volumeName, ozoneBucket.getVolumeName());
+
+ String value = "random data";
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+ value.length(), ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ ozoneOutputStream.write(value.getBytes(UTF_8), 0, value.length());
+ ozoneOutputStream.close();
+
+ try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+ byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+ IOUtils.readFully(ozoneInputStream, fileContent);
+ assertEquals(value, new String(fileContent, UTF_8));
+ }
+
+ } catch (IOException e) {
+ if (!checkSuccess) {
+ // If the last OM to be tried by the RetryProxy is down, we would get
+ // ConnectException. Otherwise, we would get a RemoteException from the
+ // last running OM as it would fail to get a quorum.
+ if (e instanceof RemoteException) {
+ assertThat(e).hasMessageContaining("is not the leader");
+ } else if (e instanceof ConnectException) {
+ assertThat(e).hasMessageContaining("Connection refused");
+ } else {
+ assertThat(e).hasMessageContaining("Could not determine or connect to OM Leader");
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ protected void waitForLeaderToBeReady()
+ throws InterruptedException, TimeoutException {
+ // Wait for Leader Election timeout
+ cluster.waitForLeaderOM();
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
new file mode 100644
index 000000000000..c9df20d958a6
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.util.UUID.randomUUID;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_DELETE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.protobuf.ServiceException;
+import java.net.InetSocketAddress;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ozone Manager HA follower read tests where all OMs are running throughout all tests.
+ * @see TestOzoneManagerHAFollowerReadWithAllRunning
+ */
+public class TestOzoneManagerHAFollowerReadWithAllRunning extends TestOzoneManagerHAFollowerRead {
+
+ @Test
+ void testOMFollowerReadProxyProviderInitialization() {
+ OzoneClient rpcClient = getClient();
+
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(
+ rpcClient.getObjectStore().getClientProxy()
+ );
+
+ List omProxies =
+ followerReadFailoverProxyProvider.getOMProxies();
+
+ assertEquals(getNumOfOMs(), omProxies.size());
+
+ for (int i = 0; i < getNumOfOMs(); i++) {
+ OzoneManager om = getCluster().getOzoneManager(i);
+ InetSocketAddress omRpcServerAddr = om.getOmRpcServerAddr();
+ boolean omClientProxyExists = false;
+ for (ProxyInfo proxyInfo : omProxies) {
+ OMProxyInfo omProxyInfo = (OMProxyInfo) proxyInfo;
+ if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
+ omClientProxyExists = true;
+ break;
+ }
+ }
+ assertTrue(omClientProxyExists,
+ () -> "No Client Proxy for node " + om.getOMNodeId());
+ }
+ }
+
+ @Test
+ void testFollowerReadTargetsFollower() throws Exception {
+ ObjectStore objectStore = getObjectStore();
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(objectStore.getClientProxy());
+
+ String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
+ String followerOMNodeId = null;
+ for (OzoneManager om : getCluster().getOzoneManagersList()) {
+ if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+ followerOMNodeId = om.getOMNodeId();
+ break;
+ }
+ }
+ assertNotNull(followerOMNodeId);
+
+ followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId);
+ objectStore.getClientProxy().listVolumes(null, null, 10);
+
+ OMProxyInfo lastProxy =
+ (OMProxyInfo) followerReadFailoverProxyProvider.getLastProxy();
+ assertNotNull(lastProxy);
+ assertEquals(followerOMNodeId, lastProxy.getNodeId());
+ }
+
+ @Test
+ public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
+ ObjectStore objectStore = getObjectStore();
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy());
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFollowerReadFailoverProxyProvider(objectStore.getClientProxy());
+ String initialFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+ // Run couple of createVolume tests to discover the current Leader OM
+ createVolumeTest(true);
+ createVolumeTest(true);
+
+ // The oMFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Perform a manual failover of the proxy provider to move the
+ // currentProxyIndex to a node other than the leader OM.
+ omFailoverProxyProvider.selectNextOmProxy();
+ omFailoverProxyProvider.performFailover(null);
+
+ String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+ assertNotEquals(leaderOMNodeId, newProxyNodeId);
+
+ // Once another request is sent to this new proxy node, the leader
+ // information must be returned via the response and a failover must
+ // happen to the leader proxy node.
+ // This will also do some read operations where this might read from the follower.
+ createVolumeTest(true);
+ Thread.sleep(2000);
+
+ String newLeaderOMNodeId =
+ omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // The old and new Leader OM NodeId must match since there was no new
+ // election in the Ratis ring.
+ assertEquals(leaderOMNodeId, newLeaderOMNodeId);
+
+ // The follower read proxy should remain unchanged since the follower is not throwing exceptions
+ // The performFailover on the leader proxy should not affect the follower read proxy provider
+ String currentFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+ assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
+ }
+
+ /**
+ * Choose a follower to send the request, the returned exception should
+ * include the suggested leader node.
+ */
+ @Test
+ public void testFailoverWithSuggestedLeader() throws Exception {
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(getObjectStore().getClientProxy());
+
+ // Make sure All OMs are ready.
+ createVolumeTest(true);
+
+ String leaderOMNodeId = null;
+ OzoneManager followerOM = null;
+ for (OzoneManager om: getCluster().getOzoneManagersList()) {
+ if (om.isLeaderReady()) {
+ leaderOMNodeId = om.getOMNodeId();
+ } else if (followerOM == null) {
+ followerOM = om;
+ }
+ }
+ assertNotNull(followerOM);
+ assertNotNull(leaderOMNodeId);
+ String leaderOMAddress = ((OMProxyInfo)
+ omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId))
+ .getAddress().getAddress().toString();
+ assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
+ followerOM.getOmRatisServer().getLeaderStatus());
+
+ CreateVolumeRequest.Builder req =
+ CreateVolumeRequest.newBuilder();
+ VolumeInfo volumeInfo = VolumeInfo.newBuilder()
+ .setVolume("testvolume")
+ .setAdminName("admin")
+ .setOwnerName("admin")
+ .build();
+ req.setVolumeInfo(volumeInfo);
+
+ OzoneManagerProtocolProtos.OMRequest writeRequest =
+ OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(Type.CreateVolume)
+ .setCreateVolumeRequest(req)
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(randomUUID().toString())
+ .build();
+
+ OzoneManagerProtocolServerSideTranslatorPB omServerProtocol =
+ followerOM.getOmServerProtocol();
+ ServiceException ex = assertThrows(ServiceException.class,
+ () -> omServerProtocol.submitRequest(null, writeRequest));
+ assertThat(ex).hasCauseInstanceOf(OMNotLeaderException.class)
+ .hasMessageEndingWith("Suggested leader is OM:" + leaderOMNodeId + "[" + leaderOMAddress + "].");
+ }
+
+ /**
+ * Test strong read-after-write consistency across clients. This means that
+ * after a single client has finished writing, another client should be able
+ * to immediately see the changes.
+ */
+ @Test
+ void testLinearizableReadConsistency() throws Exception {
+ // Setup another client
+ OzoneConfiguration clientConf = OzoneConfiguration.of(getConf());
+ clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+ OzoneClient anotherClient = null;
+ try {
+ anotherClient = OzoneClientFactory.getRpcClient(getOmServiceId(), clientConf);
+ ObjectStore anotherObjectStore = anotherClient.getObjectStore();
+
+ // Ensure that the proxy provider of the two clients are not shared
+ assertNotSame(
+ OmFailoverProxyUtil.getFailoverProxyProvider(getObjectStore().getClientProxy()),
+ OmFailoverProxyUtil.getFailoverProxyProvider(anotherObjectStore.getClientProxy()));
+ HadoopRpcOMFollowerReadFailoverProxyProvider otherClientFollowerReadProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(anotherObjectStore.getClientProxy());
+ assertNotSame(
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(getObjectStore().getClientProxy()),
+ otherClientFollowerReadProxyProvider);
+ String initialProxyOmNodeId = otherClientFollowerReadProxyProvider.getCurrentProxy().getNodeId();
+
+ // Setup the bucket and create a key with the default client
+ OzoneBucket ozoneBucket = setupBucket();
+ String key = createKey(ozoneBucket);
+
+ // Immediately read using another client, this might or might
+ // not be sent to the leader. Regardless, the other client should be
+ // able to see the read immediately.
+ OzoneKey keyReadFromAnotherClient = anotherObjectStore.getClientProxy().headObject(
+ ozoneBucket.getVolumeName(), ozoneBucket.getName(), key);
+ assertEquals(key, keyReadFromAnotherClient.getName());
+
+ // Create a more keys
+ for (int i = 0; i < 100; i++) {
+ createKey(ozoneBucket);
+ }
+
+ List ozoneKeys = anotherObjectStore.getClientProxy().listKeys(
+ ozoneBucket.getVolumeName(), ozoneBucket.getName(),
+ null, null, 1000);
+ assertEquals(101, ozoneKeys.size());
+ // Since the OM node is normal, it should not failover
+ assertEquals(initialProxyOmNodeId, otherClientFollowerReadProxyProvider.getCurrentProxy().getNodeId());
+ } finally {
+ IOUtils.closeQuietly(anotherClient);
+ }
+ }
+
+ @Test
+ void testFileOperationsWithRecursive() throws Exception {
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String data = "random data";
+
+ // one level key name
+ testCreateFile(ozoneBucket, randomUUID().toString(), data, true, false);
+
+ // multi level key name
+ String keyName = "dir1/dir2/dir3/file1";
+ testCreateFile(ozoneBucket, keyName, data, true, false);
+
+ String newData = "random data random data";
+
+ // multi level key name with overwrite set.
+ testCreateFile(ozoneBucket, keyName, newData, true, true);
+
+ OMException ex = assertThrows(OMException.class,
+ () -> testCreateFile(ozoneBucket, keyName, "any", true, false));
+ assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+
+ // Try now with a file name which is same as a directory.
+ String dir = "folder/folder2";
+ ozoneBucket.createDirectory(dir);
+ ex = assertThrows(OMException.class,
+ () -> testCreateFile(ozoneBucket, dir, "any", true, false));
+ assertEquals(NOT_A_FILE, ex.getResult());
+ }
+
+ @Test
+ void testKeysDelete() throws Exception {
+ OzoneBucket ozoneBucket = setupBucket();
+ String data = "random data";
+ String keyName1 = "dir/file1";
+ String keyName2 = "dir/file2";
+ String keyName3 = "dir/file3";
+ String keyName4 = "dir/file4";
+ List keyList1 = new ArrayList<>();
+ keyList1.add(keyName2);
+ keyList1.add(keyName3);
+
+ testCreateFile(ozoneBucket, keyName1, data, true, false);
+ testCreateFile(ozoneBucket, keyName2, data, true, false);
+ testCreateFile(ozoneBucket, keyName3, data, true, false);
+ testCreateFile(ozoneBucket, keyName4, data, true, false);
+
+ // Delete keyName1 use deleteKey api.
+ ozoneBucket.deleteKey(keyName1);
+
+ // Delete keyName2 and keyName3 in keyList1 using the deleteKeys api.
+ ozoneBucket.deleteKeys(keyList1);
+
+ // In keyList2 keyName3 was previously deleted and KeyName4 exists .
+ List keyList2 = new ArrayList<>();
+ keyList2.add(keyName3);
+ keyList2.add(keyName4);
+
+ // Because keyName3 has been deleted, there should be a KEY_NOT_FOUND
+ // exception. In this case, we test for deletion failure.
+ OMException ex = assertThrows(OMException.class,
+ () -> ozoneBucket.deleteKeys(keyList2));
+ // The expected exception PARTIAL_DELETE, as if not able to delete, we
+ // return error codee PARTIAL_DElETE.
+ assertEquals(PARTIAL_DELETE, ex.getResult());
+ }
+
+ @Test
+ void testFileOperationsWithNonRecursive() throws Exception {
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String data = "random data";
+
+ // one level key name
+ testCreateFile(ozoneBucket, randomUUID().toString(), data, false, false);
+
+ // multi level key name
+ String keyName = "dir1/dir2/dir3/file1";
+
+ // Should fail, as this is non-recursive and no parent directories exist
+ try {
+ testCreateFile(ozoneBucket, keyName, data, false, false);
+ } catch (OMException ex) {
+ assertEquals(DIRECTORY_NOT_FOUND, ex.getResult());
+ }
+
+ // create directory, now this should pass.
+ ozoneBucket.createDirectory("dir1/dir2/dir3");
+ testCreateFile(ozoneBucket, keyName, data, false, false);
+ data = "random data random data";
+
+ // multi level key name with overwrite set.
+ testCreateFile(ozoneBucket, keyName, data, false, true);
+
+ OMException ex = assertThrows(OMException.class,
+ () -> testCreateFile(ozoneBucket, keyName, "any", false, false));
+ assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+
+ // Try now with a file which already exists under the path
+ ozoneBucket.createDirectory("folder1/folder2/folder3/folder4");
+
+ testCreateFile(ozoneBucket, "folder1/folder2/folder3/folder4/file1", data,
+ false, false);
+
+ testCreateFile(ozoneBucket, "folder1/folder2/folder3/file1", data, false,
+ false);
+
+ // Try now with a file under path already. This should fail.
+ String dir = "folder/folder2";
+ ozoneBucket.createDirectory(dir);
+ ex = assertThrows(OMException.class,
+ () -> testCreateFile(ozoneBucket, dir, "any", false, false)
+ );
+ assertEquals(NOT_A_FILE, ex.getResult());
+ }
+
+ private OzoneVolume createAndCheckVolume(String volumeName)
+ throws Exception {
+ String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+ String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ ObjectStore objectStore = getObjectStore();
+ objectStore.createVolume(volumeName, createVolumeArgs);
+
+ OzoneVolume retVolume = objectStore.getVolume(volumeName);
+
+ assertEquals(volumeName, retVolume.getName());
+ assertEquals(userName, retVolume.getOwner());
+ assertEquals(adminName, retVolume.getAdmin());
+
+ return retVolume;
+ }
+
+ @Test
+ public void testAllVolumeOperations() throws Exception {
+ String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+ createAndCheckVolume(volumeName);
+
+ ObjectStore objectStore = getObjectStore();
+ objectStore.deleteVolume(volumeName);
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+ () -> objectStore.getVolume(volumeName));
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+ () -> objectStore.deleteVolume(volumeName));
+ }
+
+ @Test
+ public void testAllBucketOperations() throws Exception {
+ String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+ String bucketName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+ OzoneVolume retVolume = createAndCheckVolume(volumeName);
+
+ BucketArgs bucketArgs =
+ BucketArgs.newBuilder().setStorageType(StorageType.DISK)
+ .setVersioning(true).build();
+
+
+ retVolume.createBucket(bucketName, bucketArgs);
+
+
+ OzoneBucket ozoneBucket = retVolume.getBucket(bucketName);
+
+ assertEquals(volumeName, ozoneBucket.getVolumeName());
+ assertEquals(bucketName, ozoneBucket.getName());
+ assertTrue(ozoneBucket.getVersioning());
+ assertEquals(StorageType.DISK, ozoneBucket.getStorageType());
+ assertFalse(ozoneBucket.getCreationTime().isAfter(Instant.now()));
+
+
+ // Change versioning to false
+ ozoneBucket.setVersioning(false);
+
+ ozoneBucket = retVolume.getBucket(bucketName);
+ assertFalse(ozoneBucket.getVersioning());
+
+ retVolume.deleteBucket(bucketName);
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
+ () -> retVolume.deleteBucket(bucketName));
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
new file mode 100644
index 000000000000..2e2683400eac
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+/**
+ * Ozone Manager HA follower read tests that stop/restart one or more OM nodes.
+ * @see TestOzoneManagerHAFollowerReadWithAllRunning
+ */
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TestOzoneManagerHAFollowerReadWithStoppedNodes extends TestOzoneManagerHAFollowerRead {
+
+ /**
+ * After restarting OMs we need to wait
+ * for a leader to be elected and ready.
+ */
+ @BeforeEach
+ void setup() throws Exception {
+ waitForLeaderToBeReady();
+ }
+
+ /**
+ * Restart all OMs after each test.
+ */
+ @AfterEach
+ void resetCluster() throws Exception {
+ MiniOzoneHAClusterImpl cluster = getCluster();
+ if (cluster != null) {
+ cluster.restartOzoneManager();
+ }
+ }
+
+ /**
+ * Test client request succeeds when one OM node is down.
+ */
+ @Test
+ void oneOMDown() throws Exception {
+ changeFollowerReadInitialProxy(1);
+
+ getCluster().stopOzoneManager(1);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+ createVolumeTest(true);
+ createKeyTest(true);
+ }
+
+ /**
+ * Test client request fails when 2 OMs are down.
+ */
+ @Test
+ void twoOMDown() throws Exception {
+ changeFollowerReadInitialProxy(1);
+
+ getCluster().stopOzoneManager(1);
+ getCluster().stopOzoneManager(2);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+ createVolumeTest(false);
+ createKeyTest(false);
+ }
+
+ @Test
+ void testMultipartUpload() throws Exception {
+
+ // Happy scenario when all OM's are up.
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+ createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+ testMultipartUploadWithOneOmNodeDown();
+ }
+
+ private void testMultipartUploadWithOneOmNodeDown() throws Exception {
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+ // After initiate multipartupload, shutdown leader OM.
+ // Stop leader OM, to see when the OM leader changes
+ // multipart upload is happening successfully or not.
+
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(getObjectStore().getClientProxy());
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFollowerReadFailoverProxyProvider(getObjectStore().getClientProxy());
+
+ // The omFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Also change the initial follower read proxy to the current leader OM node
+ changeFollowerReadInitialProxy(leaderOMNodeId);
+
+ // Stop one of the ozone manager, to see when the OM leader changes
+ // multipart upload is happening successfully or not.
+ getCluster().stopOzoneManager(leaderOMNodeId);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+ createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+ String newLeaderOMNodeId =
+ omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ assertNotEquals(leaderOMNodeId, newLeaderOMNodeId);
+ assertNotEquals(leaderOMNodeId, followerReadFailoverProxyProvider.getCurrentProxy().getNodeId());
+ }
+
+ private String initiateMultipartUpload(OzoneBucket ozoneBucket,
+ String keyName) throws Exception {
+
+ OmMultipartInfo omMultipartInfo =
+ ozoneBucket.initiateMultipartUpload(keyName,
+ ReplicationType.RATIS,
+ ReplicationFactor.ONE);
+
+ String uploadID = omMultipartInfo.getUploadID();
+ assertNotNull(uploadID);
+ return uploadID;
+ }
+
+ private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket,
+ String keyName, String uploadID) throws Exception {
+
+ String value = "random data";
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
+ keyName, value.length(), 1, uploadID);
+ ozoneOutputStream.write(value.getBytes(UTF_8), 0, value.length());
+ ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, DigestUtils.md5Hex(value));
+ ozoneOutputStream.close();
+
+
+ Map partsMap = new HashMap<>();
+ partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getETag());
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+ ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap);
+
+ assertNotNull(omMultipartUploadCompleteInfo);
+ assertNotNull(omMultipartUploadCompleteInfo.getHash());
+
+
+ try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+ byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+ IOUtils.readFully(ozoneInputStream, fileContent);
+ assertEquals(value, new String(fileContent, UTF_8));
+ }
+ }
+
+ @Test
+ void testLeaderOmProxyProviderFailoverOnConnectionFailure() throws Exception {
+ ObjectStore objectStore = getObjectStore();
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy());
+ String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ createVolumeTest(true);
+
+ // On stopping the current OM Proxy, the next connection attempt should
+ // failover to a another OM proxy.
+ getCluster().stopOzoneManager(firstProxyNodeId);
+ Thread.sleep(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT * 4);
+
+ // Next request to the proxy provider should result in a failover
+ createVolumeTest(true);
+ Thread.sleep(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
+
+ // Get the new OM Proxy NodeId
+ String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Verify that a failover occurred. the new proxy nodeId should be
+ // different from the old proxy nodeId.
+ assertNotEquals(firstProxyNodeId, newProxyNodeId);
+ }
+
+ @Test
+ void testFollowerReadOmProxyProviderFailoverOnConnectionFailure() throws Exception {
+ ObjectStore objectStore = getObjectStore();
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFollowerReadFailoverProxyProvider(objectStore.getClientProxy());
+ String firstProxyNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+ objectStore.getClientProxy().listVolumes(null, null, 1000);
+
+ // On stopping the current OM Proxy, the next connection attempt should
+ // failover to another OM proxy.
+ getCluster().stopOzoneManager(firstProxyNodeId);
+
+ // Next request to the proxy provider should result in a failover
+ objectStore.getClientProxy().listVolumes(null, null, 1000);
+
+ // Get the new OM Proxy NodeId
+ String newProxyNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+ // Verify that a failover occurred. the new proxy nodeId should be
+ // different from the old proxy nodeId.
+ assertNotEquals(firstProxyNodeId, newProxyNodeId);
+ assertTrue(followerReadFailoverProxyProvider.isFollowerReadEnabled());
+ }
+
+ @Test
+ void testFollowerReadSkipsStoppedFollower() throws Exception {
+ ObjectStore objectStore = getObjectStore();
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFollowerReadFailoverProxyProvider(objectStore.getClientProxy());
+
+ String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
+ List followerOmNodeIds = new ArrayList<>();
+ for (OzoneManager om : getCluster().getOzoneManagersList()) {
+ if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+ followerOmNodeIds.add(om.getOMNodeId());
+ }
+ }
+ assertFalse(followerOmNodeIds.isEmpty());
+
+ String stoppedFollowerNodeId = followerOmNodeIds.get(0);
+ getCluster().stopOzoneManager(stoppedFollowerNodeId);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+ followerReadFailoverProxyProvider.changeInitialProxyForTest(stoppedFollowerNodeId);
+ objectStore.getClientProxy().listVolumes(null, null, 10);
+
+ OMProxyInfo lastProxy =
+ (OMProxyInfo) followerReadFailoverProxyProvider.getLastProxy();
+ assertNotNull(lastProxy);
+ assertNotEquals(stoppedFollowerNodeId, lastProxy.getNodeId());
+ }
+
+ @Test
+ @Order(Integer.MAX_VALUE - 1)
+ void testIncrementalWaitTimeWithSameNodeFailover() throws Exception {
+ long waitBetweenRetries = getConf().getLong(
+ OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
+ OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(getObjectStore().getClientProxy());
+
+ // The omFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ getCluster().stopOzoneManager(leaderOMNodeId);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+ createKeyTest(true); // failover should happen to new node
+
+ long numTimesTriedToSameNode = omFailoverProxyProvider.getWaitTime()
+ / waitBetweenRetries;
+ omFailoverProxyProvider.setNextOmProxy(omFailoverProxyProvider.
+ getCurrentProxyOMNodeId());
+ assertEquals((numTimesTriedToSameNode + 1) * waitBetweenRetries,
+ omFailoverProxyProvider.getWaitTime());
+ }
+
+ @Test
+ void testOMRetryProxy() {
+ int maxFailoverAttempts = getOzoneClientFailoverMaxAttempts();
+ // Stop all the OMs.
+ for (int i = 0; i < getNumOfOMs(); i++) {
+ getCluster().stopOzoneManager(i);
+ }
+
+ final LogVerificationAppender appender = new LogVerificationAppender();
+ final Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+
+ // After making N (set maxRetries value) connection attempts to OMs,
+ // the RpcClient should give up.
+ assertThrows(ConnectException.class, () -> createVolumeTest(true));
+ assertEquals(1,
+ appender.countLinesWithMessage("Failed to connect to OMs:"));
+ assertEquals(maxFailoverAttempts,
+ appender.countLinesWithMessage("Trying to failover"));
+ assertEquals(1, appender.countLinesWithMessage("Attempted " +
+ maxFailoverAttempts + " failovers."));
+ }
+
+ private void changeFollowerReadInitialProxy(int omIndex) {
+ // Change the initial proxy to the OM to be stopped to test follower read failover
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(getObjectStore().getClientProxy());
+ followerReadFailoverProxyProvider.changeInitialProxyForTest(getCluster().getOzoneManager(omIndex).getOMNodeId());
+ }
+
+ private void changeFollowerReadInitialProxy(String omNodeId) {
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(getObjectStore().getClientProxy());
+ followerReadFailoverProxyProvider.changeInitialProxyForTest(omNodeId);
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 8c55eb49caa2..7313a103e51a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -20,6 +20,7 @@
import static java.util.UUID.randomUUID;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
@@ -51,7 +52,9 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneAcl;
@@ -60,13 +63,16 @@
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -383,7 +389,7 @@ public void testFailoverWithSuggestedLeader() throws Exception {
assertSame(followerOM.getOmRatisServer().getLeaderStatus(),
OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER);
- OzoneManagerProtocolProtos.OMRequest writeRequest =
+ OzoneManagerProtocolProtos.OMRequest readRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.ListVolume)
.setVersion(ClientVersion.CURRENT_VERSION)
@@ -393,7 +399,7 @@ public void testFailoverWithSuggestedLeader() throws Exception {
OzoneManagerProtocolServerSideTranslatorPB omServerProtocol =
followerOM.getOmServerProtocol();
ServiceException ex = assertThrows(ServiceException.class,
- () -> omServerProtocol.submitRequest(null, writeRequest));
+ () -> omServerProtocol.submitRequest(null, readRequest));
assertThat(ex).hasCauseInstanceOf(OMNotLeaderException.class)
.hasMessageEndingWith("Suggested leader is OM:" + leaderOMNodeId + "[" + leaderOMAddress + "].");
}
@@ -1160,4 +1166,65 @@ void testOMRatisSnapshot() throws Exception {
"snapshot indices");
}
+
+ @Test
+ void testOMFollowerReadWithClusterDisabled() throws Exception {
+ OzoneConfiguration clientConf = OzoneConfiguration.of(getConf());
+ clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+ OzoneClient ozoneClient = null;
+ try {
+ ozoneClient = OzoneClientFactory.getRpcClient(clientConf);
+
+ HadoopRpcOMFailoverProxyProvider leaderFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(ozoneClient.getProxy());
+ HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
+ OmFailoverProxyUtil.getFollowerReadFailoverProxyProvider(
+ ozoneClient.getProxy()
+ );
+ assertNotNull(followerReadFailoverProxyProvider);
+ assertTrue(followerReadFailoverProxyProvider.isFollowerReadEnabled());
+
+ ObjectStore objectStore = ozoneClient.getObjectStore();
+
+ // Trigger write so that the leader failover proxy provider points to the leader
+ objectStore.createVolume(volumeName, createVolumeArgs);
+
+ // Get the leader OM node ID
+ String leaderOMNodeId = leaderFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Pick a follower and tigger read so that read failover proxy provider fall back to the leader-only read
+ // on encountering OMNotLeaderException
+ String followerOMNodeId = null;
+ for (OzoneManager om : getCluster().getOzoneManagersList()) {
+ if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+ followerOMNodeId = om.getOMNodeId();
+ break;
+ }
+ }
+ assertNotNull(followerOMNodeId);
+ followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId);
+ objectStore.getVolume(volumeName);
+
+ // Client follower read is disabled since it detected that the cluster does not
+ // support follower read
+ assertFalse(followerReadFailoverProxyProvider.isFollowerReadEnabled());
+ OMProxyInfo lastProxy =
+ (OMProxyInfo) followerReadFailoverProxyProvider.getLastProxy();
+ // The last read will be done on the leader
+ assertEquals(leaderFailoverProxyProvider.getCurrentProxyOMNodeId(), lastProxy.getNodeId());
+ } finally {
+ IOUtils.closeQuietly(ozoneClient);
+ }
+
+ }
}
diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index 61a25abda264..0d59efbc2c90 100644
--- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -45,7 +46,8 @@ public class Hadoop27RpcTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
- private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider;
public Hadoop27RpcTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -54,14 +56,30 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
+ boolean followerReadEnabled = conf.getBoolean(
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
+ );
+
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
- rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
+ if (followerReadEnabled) {
+ this.followerReadFailoverProxyProvider = new HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+ omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
+ );
+ this.rpcProxy = createRetryProxy(followerReadFailoverProxyProvider, maxFailovers);
+ } else {
+ // TODO: It should be possible to simply instantiate HadoopRpcOMFollowerReadFailoverProxyProvider
+ // even if the follower read is not enabled. We can try this to ensure that the tests still pass which
+ // suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a indeed a superset of
+ // HadoopRpcOMFollowerReadFailoverProxyProvider
+ this.rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
+ }
}
@@ -110,9 +128,26 @@ private OzoneManagerProtocolPB createRetryProxy(
return proxy;
}
+ /**
+ * Creates a {@link RetryProxy} encapsulating the
+ * {@link HadoopRpcOMFollowerReadFailoverProxyProvider}.
+ */
+ private OzoneManagerProtocolPB createRetryProxy(
+ HadoopRpcOMFollowerReadFailoverProxyProvider failoverProxyProvider,
+ int maxFailovers) {
+ return (OzoneManagerProtocolPB) RetryProxy.create(
+ OzoneManagerProtocolPB.class, failoverProxyProvider,
+ failoverProxyProvider.getRetryPolicy(maxFailovers)
+ );
+ }
+
@Override
public void close() throws IOException {
- omFailoverProxyProvider.close();
+ if (followerReadFailoverProxyProvider != null) {
+ followerReadFailoverProxyProvider.close();
+ } else {
+ omFailoverProxyProvider.close();
+ }
}
}