Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class SCMNodeManager implements NodeManager {
private final boolean useHostname;
private final Map<String, Set<DatanodeID>> dnsToDnIdMap = new ConcurrentHashMap<>();
private final int numPipelinesPerMetadataVolume;
private final int heavyNodeCriteria;
private final int datanodePipelineLimit;
private final HDDSLayoutVersionManager scmLayoutVersionManager;
private final EventPublisher scmNodeEventPublisher;
private final SCMContext scmContext;
Expand Down Expand Up @@ -195,8 +195,9 @@ public SCMNodeManager(
this.numPipelinesPerMetadataVolume =
conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.datanodePipelineLimit = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.numContainerPerVolume = conf.getInt(
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
Expand Down Expand Up @@ -1602,8 +1603,8 @@ public int totalHealthyVolumeCount() {
@Override
public int pipelineLimit(DatanodeDetails dn) {
try {
if (heavyNodeCriteria > 0) {
return heavyNodeCriteria;
if (datanodePipelineLimit > 0) {
return datanodePipelineLimit;
} else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) {
return numPipelinesPerMetadataVolume *
nodeStateManager.getNode(dn).getMetaDataVolumeCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final int heavyNodeCriteria;
private final int datanodePipelineLimit;
private static final int REQUIRED_RACKS = 2;

public static final String MULTIPLE_RACK_PIPELINE_MSG =
Expand All @@ -76,8 +76,9 @@ public PipelinePlacementPolicy(final NodeManager nodeManager,
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.stateManager = stateManager;
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.datanodePipelineLimit = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}

public static int currentRatisThreePipelineCount(
Expand Down Expand Up @@ -182,7 +183,7 @@ List<DatanodeDetails> filterViableNodes(
if (healthyList.size() < nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to find enough nodes that meet the criteria that" +
" cannot engage in more than" + heavyNodeCriteria +
" cannot engage in more than" + datanodePipelineLimit +
" pipelines. Nodes required: " + nodesRequired + " Excluded: " +
excludedNodesSize + " Found:" +
healthyList.size() + " healthy nodes count in NodeManager: " +
Expand All @@ -191,7 +192,7 @@ List<DatanodeDetails> filterViableNodes(
msg = String.format("Pipeline creation failed because nodes are engaged" +
" in other pipelines and every node can only be engaged in" +
" max %d pipelines. Required %d. Found %d. Excluded: %d.",
heavyNodeCriteria, nodesRequired, healthyList.size(),
datanodePipelineLimit, nodesRequired, healthyList.size(),
excludedNodesSize);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class RatisPipelineProvider
private final EventPublisher eventPublisher;
private final PlacementPolicy placementPolicy;
private final int pipelineNumberLimit;
private final int maxPipelinePerDatanode;
private final int datanodePipelineLimit;
private final LeaderChoosePolicy leaderChoosePolicy;
private final SCMContext scmContext;
private final long containerSizeBytes;
Expand All @@ -80,9 +80,9 @@ public RatisPipelineProvider(NodeManager nodeManager,
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.maxPipelinePerDatanode = dnLimit == null ? 0 :
Integer.parseInt(dnLimit);
this.datanodePipelineLimit = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.containerSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
Expand Down Expand Up @@ -110,10 +110,10 @@ private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConf
int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, PipelineState.CLOSED).size();
int openPipelines = totalActivePipelines - closedPipelines;
// Check per-datanode pipeline limit
if (maxPipelinePerDatanode > 0) {
if (datanodePipelineLimit > 0) {
int healthyNodeCount = getNodeManager()
.getNodeCount(NodeStatus.inServiceHealthy());
int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount)
int allowedOpenPipelines = (datanodePipelineLimit * healthyNodeCount)
/ replicationConfig.getRequiredNodes();
return openPipelines >= allowedOpenPipelines;
}
Expand Down Expand Up @@ -145,8 +145,8 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException {
if (exceedPipelineNumberLimit(replicationConfig)) {
String limitInfo = (maxPipelinePerDatanode > 0)
? String.format("per datanode: %d", maxPipelinePerDatanode)
String limitInfo = (datanodePipelineLimit > 0)
? String.format("per datanode: %d", datanodePipelineLimit)
: String.format(": %d", pipelineNumberLimit);

throw new SCMException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,28 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
}
}

/**
* Test that pipelineLimit() uses the default value when the config is not set.
*/
@Test
public void testUsesDefaultPipelineLimitWhenUnset()
throws IOException, AuthenticationException {

// Creates node manager with config without limit set
OzoneConfiguration conf = getConf();
conf.unset(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);

try (SCMNodeManager nodeManager = createNodeManager(conf)) {

// Registers datanode with healthy volumes
DatanodeDetails dn = registerWithCapacity(nodeManager);

// Calls pipelineLimit() and verifies returns default value
int limit = nodeManager.pipelineLimit(dn);
assertEquals(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT, limit);
}
}

private static Stream<Arguments> nodeStateTransitions() {
return Stream.of(
// start decommissioning or entering maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,60 @@ public void testCurrentRatisThreePipelineCount()
assertEquals(pipelineCount, 2);
}

@Test
public void testPipelinePlacementPolicyDefaultLimitFiltersNodeAtLimit()
throws IOException, TimeoutException {

// 1) Creates policy with config without limit set
OzoneConfiguration localConf = new OzoneConfiguration(conf);
localConf.unset(OZONE_DATANODE_PIPELINE_LIMIT);

MockNodeManager localNodeManager = new MockNodeManager(cluster,
getNodesWithRackAwareness(), false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);

// Ensure NodeManager uses default limit (=2) when limit is not set in conf
localNodeManager.setNumPipelinePerDatanode(
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);

PipelineStateManager localStateManager = PipelineStateManagerImpl.newBuilder()
.setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(localNodeManager)
.setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();

PipelinePlacementPolicy localPolicy = new PipelinePlacementPolicy(
localNodeManager, localStateManager, localConf);

List<DatanodeDetails> healthy =
localNodeManager.getNodes(NodeStatus.inServiceHealthy());
DatanodeDetails target = healthy.get(0);

// 2) Adds exactly 2 pipelines to test node (default limit)
List<DatanodeDetails> p1Dns = new ArrayList<>();
p1Dns.add(target);
p1Dns.add(healthy.get(1));
p1Dns.add(healthy.get(2));
createPipelineWithReplicationConfig(p1Dns, RATIS, THREE);

List<DatanodeDetails> p2Dns = new ArrayList<>();
p2Dns.add(target);
p2Dns.add(healthy.get(3));
p2Dns.add(healthy.get(4));
createPipelineWithReplicationConfig(p2Dns, RATIS, THREE);

assertEquals(2, PipelinePlacementPolicy.currentRatisThreePipelineCount(
localNodeManager, localStateManager, target));

// 3) Verifies node is filtered out when choosing nodes for new pipeline
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> chosen = localPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0, 0);

assertEquals(nodesRequired, chosen.size());
assertThat(chosen).doesNotContain(target);
}

private void createPipelineWithReplicationConfig(List<DatanodeDetails> dnList,
HddsProtos.ReplicationType
replicationType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,56 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir File tempDir) throws
}
}

@Test
public void testCreatePipelineWithDefaultLimit() throws Exception {
// Create conf without setting OZONE_DATANODE_PIPELINE_LIMIT
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());

dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get());

// MockNodeManager(true, 10) typically gives 8 healthy nodes in this test suite.
nodeManager = new MockNodeManager(true, nodeCount);
// Give a large quota in MockNodeManager so we don't fail early due to mock quota.
nodeManager.setNumPipelinePerDatanode(100);

SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
stateManager = PipelineStateManagerImpl.newBuilder()
.setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(nodeManager)
.setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();

provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf);

int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy()).size();
int defaultLimit = ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
assertEquals(2, defaultLimit);

// Max pipelines before exceeding per-DN default limit.
int maxPipelines = (healthyCount * defaultLimit)
/ ReplicationFactor.THREE.getNumber();

// Create pipelines up to maxPipelines.
for (int i = 0; i < maxPipelines; i++) {
Pipeline p = provider.create(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
new ArrayList<>(), new ArrayList<>());
stateManager.addPipeline(p.getProtobufMessage(ClientVersion.CURRENT_VERSION));
}

// Next pipeline creation should fail with default limit message.
SCMException ex = assertThrows(SCMException.class, () ->
provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
new ArrayList<>(), new ArrayList<>())
);

assertThat(ex.getMessage())
.contains("limit per datanode: " + defaultLimit)
.contains("replicationConfig: RATIS/THREE");
}

@ParameterizedTest
@CsvSource({ "1, 3", "2, 6"})
public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelineCount) throws Exception {
Expand Down