diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 26d9e49bafc..e2a17dd1d5c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -133,7 +133,7 @@ public class SCMNodeManager implements NodeManager { private final boolean useHostname; private final Map> dnsToDnIdMap = new ConcurrentHashMap<>(); private final int numPipelinesPerMetadataVolume; - private final int heavyNodeCriteria; + private final int datanodePipelineLimit; private final HDDSLayoutVersionManager scmLayoutVersionManager; private final EventPublisher scmNodeEventPublisher; private final SCMContext scmContext; @@ -195,8 +195,9 @@ public SCMNodeManager( this.numPipelinesPerMetadataVolume = conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME, ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT); - String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); - this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); + this.datanodePipelineLimit = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); this.numContainerPerVolume = conf.getInt( ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT); @@ -1602,8 +1603,8 @@ public int totalHealthyVolumeCount() { @Override public int pipelineLimit(DatanodeDetails dn) { try { - if (heavyNodeCriteria > 0) { - return heavyNodeCriteria; + if (datanodePipelineLimit > 0) { + return datanodePipelineLimit; } else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) { return numPipelinesPerMetadataVolume * nodeStateManager.getNode(dn).getMetaDataVolumeCount(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 696d6ecc336..366a33dccf2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -54,7 +54,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { LoggerFactory.getLogger(PipelinePlacementPolicy.class); private final NodeManager nodeManager; private final PipelineStateManager stateManager; - private final int heavyNodeCriteria; + private final int datanodePipelineLimit; private static final int REQUIRED_RACKS = 2; public static final String MULTIPLE_RACK_PIPELINE_MSG = @@ -76,8 +76,9 @@ public PipelinePlacementPolicy(final NodeManager nodeManager, super(nodeManager, conf); this.nodeManager = nodeManager; this.stateManager = stateManager; - String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); - this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); + this.datanodePipelineLimit = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); } public static int currentRatisThreePipelineCount( @@ -182,7 +183,7 @@ List filterViableNodes( if (healthyList.size() < nodesRequired) { if (LOG.isDebugEnabled()) { LOG.debug("Unable to find enough nodes that meet the criteria that" + - " cannot engage in more than" + heavyNodeCriteria + + " cannot engage in more than" + datanodePipelineLimit + " pipelines. Nodes required: " + nodesRequired + " Excluded: " + excludedNodesSize + " Found:" + healthyList.size() + " healthy nodes count in NodeManager: " + @@ -191,7 +192,7 @@ List filterViableNodes( msg = String.format("Pipeline creation failed because nodes are engaged" + " in other pipelines and every node can only be engaged in" + " max %d pipelines. Required %d. Found %d. Excluded: %d.", - heavyNodeCriteria, nodesRequired, healthyList.size(), + datanodePipelineLimit, nodesRequired, healthyList.size(), excludedNodesSize); throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index b35a1f28148..800a7ebfd83 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -60,7 +60,7 @@ public class RatisPipelineProvider private final EventPublisher eventPublisher; private final PlacementPolicy placementPolicy; private final int pipelineNumberLimit; - private final int maxPipelinePerDatanode; + private final int datanodePipelineLimit; private final LeaderChoosePolicy leaderChoosePolicy; private final SCMContext scmContext; private final long containerSizeBytes; @@ -80,9 +80,9 @@ public RatisPipelineProvider(NodeManager nodeManager, this.pipelineNumberLimit = conf.getInt( ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT); - String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); - this.maxPipelinePerDatanode = dnLimit == null ? 0 : - Integer.parseInt(dnLimit); + this.datanodePipelineLimit = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); this.containerSizeBytes = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, @@ -110,10 +110,10 @@ private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConf int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, PipelineState.CLOSED).size(); int openPipelines = totalActivePipelines - closedPipelines; // Check per-datanode pipeline limit - if (maxPipelinePerDatanode > 0) { + if (datanodePipelineLimit > 0) { int healthyNodeCount = getNodeManager() .getNodeCount(NodeStatus.inServiceHealthy()); - int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount) + int allowedOpenPipelines = (datanodePipelineLimit * healthyNodeCount) / replicationConfig.getRequiredNodes(); return openPipelines >= allowedOpenPipelines; } @@ -145,8 +145,8 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, List excludedNodes, List favoredNodes) throws IOException { if (exceedPipelineNumberLimit(replicationConfig)) { - String limitInfo = (maxPipelinePerDatanode > 0) - ? String.format("per datanode: %d", maxPipelinePerDatanode) + String limitInfo = (datanodePipelineLimit > 0) + ? String.format("per datanode: %d", datanodePipelineLimit) : String.format(": %d", pipelineNumberLimit); throw new SCMException( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 84b548a4c04..928e38295f5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -2059,6 +2059,28 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname() } } + /** + * Test that pipelineLimit() uses the default value when the config is not set. + */ + @Test + public void testUsesDefaultPipelineLimitWhenUnset() + throws IOException, AuthenticationException { + + // Creates node manager with config without limit set + OzoneConfiguration conf = getConf(); + conf.unset(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + + // Registers datanode with healthy volumes + DatanodeDetails dn = registerWithCapacity(nodeManager); + + // Calls pipelineLimit() and verifies returns default value + int limit = nodeManager.pipelineLimit(dn); + assertEquals(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT, limit); + } + } + private static Stream nodeStateTransitions() { return Stream.of( // start decommissioning or entering maintenance diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index b9b5ac88455..7094a39c79e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -722,6 +722,60 @@ public void testCurrentRatisThreePipelineCount() assertEquals(pipelineCount, 2); } + @Test + public void testPipelinePlacementPolicyDefaultLimitFiltersNodeAtLimit() + throws IOException, TimeoutException { + + // 1) Creates policy with config without limit set + OzoneConfiguration localConf = new OzoneConfiguration(conf); + localConf.unset(OZONE_DATANODE_PIPELINE_LIMIT); + + MockNodeManager localNodeManager = new MockNodeManager(cluster, + getNodesWithRackAwareness(), false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); + + // Ensure NodeManager uses default limit (=2) when limit is not set in conf + localNodeManager.setNumPipelinePerDatanode( + ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); + + PipelineStateManager localStateManager = PipelineStateManagerImpl.newBuilder() + .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore)) + .setRatisServer(scmhaManager.getRatisServer()) + .setNodeManager(localNodeManager) + .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer()) + .build(); + + PipelinePlacementPolicy localPolicy = new PipelinePlacementPolicy( + localNodeManager, localStateManager, localConf); + + List healthy = + localNodeManager.getNodes(NodeStatus.inServiceHealthy()); + DatanodeDetails target = healthy.get(0); + + // 2) Adds exactly 2 pipelines to test node (default limit) + List p1Dns = new ArrayList<>(); + p1Dns.add(target); + p1Dns.add(healthy.get(1)); + p1Dns.add(healthy.get(2)); + createPipelineWithReplicationConfig(p1Dns, RATIS, THREE); + + List p2Dns = new ArrayList<>(); + p2Dns.add(target); + p2Dns.add(healthy.get(3)); + p2Dns.add(healthy.get(4)); + createPipelineWithReplicationConfig(p2Dns, RATIS, THREE); + + assertEquals(2, PipelinePlacementPolicy.currentRatisThreePipelineCount( + localNodeManager, localStateManager, target)); + + // 3) Verifies node is filtered out when choosing nodes for new pipeline + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + List chosen = localPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0, 0); + + assertEquals(nodesRequired, chosen.size()); + assertThat(chosen).doesNotContain(target); + } + private void createPipelineWithReplicationConfig(List dnList, HddsProtos.ReplicationType replicationType, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index cfeba61c320..ca9d1f5a6c3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -365,6 +365,56 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir File tempDir) throws } } + @Test + public void testCreatePipelineWithDefaultLimit() throws Exception { + // Create conf without setting OZONE_DATANODE_PIPELINE_LIMIT + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + + dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get()); + + // MockNodeManager(true, 10) typically gives 8 healthy nodes in this test suite. + nodeManager = new MockNodeManager(true, nodeCount); + // Give a large quota in MockNodeManager so we don't fail early due to mock quota. + nodeManager.setNumPipelinePerDatanode(100); + + SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true); + stateManager = PipelineStateManagerImpl.newBuilder() + .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore)) + .setRatisServer(scmhaManager.getRatisServer()) + .setNodeManager(nodeManager) + .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer()) + .build(); + + provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf); + + int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy()).size(); + int defaultLimit = ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT; + assertEquals(2, defaultLimit); + + // Max pipelines before exceeding per-DN default limit. + int maxPipelines = (healthyCount * defaultLimit) + / ReplicationFactor.THREE.getNumber(); + + // Create pipelines up to maxPipelines. + for (int i = 0; i < maxPipelines; i++) { + Pipeline p = provider.create( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()); + stateManager.addPipeline(p.getProtobufMessage(ClientVersion.CURRENT_VERSION)); + } + + // Next pipeline creation should fail with default limit message. + SCMException ex = assertThrows(SCMException.class, () -> + provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()) + ); + + assertThat(ex.getMessage()) + .contains("limit per datanode: " + defaultLimit) + .contains("replicationConfig: RATIS/THREE"); + } + @ParameterizedTest @CsvSource({ "1, 3", "2, 6"}) public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelineCount) throws Exception {