diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ad7073e3857fc..05fb0a6f44c78 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -317,7 +317,15 @@ public final class ScmConfigKeys {
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
- public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
+
+ // Upper limit for how many pipelines can be created.
+ // Only for test purpose now.
+ public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+ "ozone.scm.datanode.pipeline.number.limit";
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a03804773d6ad..1fdd8c088b2ec 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -838,10 +838,17 @@
- ozone.scm.datanode.max.pipeline.engagement
- 5
+ ozone.scm.datanode.max.pipeline.engagement
+ 0
+ OZONE, SCM, PIPELINE
+ Max number of pipelines per datanode can be engaged in.
+
+
+
+ ozone.scm.datanode.pipeline.number.limit
+ 0
OZONE, SCM, PIPELINE
- Max number of pipelines per datanode can be engaged in.
+ Upper limit for how many pipelines can be created in SCM.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index a0a722277adfa..12302276a17cf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -188,6 +188,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+ } catch (SCMException se) {
+ LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+ "Datanodes may be used up.", type, factor, se);
+ break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87fc9c0f..74431f9b05e87 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ private ContainerPlacementPolicyFactory() {
}
- public static PlacementPolicy getPolicy(Configuration conf,
- final NodeManager nodeManager, NetworkTopology clusterMap,
- final boolean fallback, SCMContainerPlacementMetrics metrics)
- throws SCMException{
+ public static PlacementPolicy getPolicy(
+ Configuration conf, final NodeManager nodeManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException{
final Class extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188dbf78eb..496b9e7f341e1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -71,6 +71,10 @@ public synchronized void addPipeline(Pipeline pipeline) {
UUID dnId = details.getUuid();
dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet())
.add(pipeline.getId());
+ dn2ObjectMap.computeIfPresent(dnId, (k, v) -> {
+ v.add(pipeline.getId());
+ return v;
+ });
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 687356648c3cb..6952f74ef29eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -96,6 +96,7 @@ private void createPipelines() {
if (scheduler.isClosed()) {
break;
}
+
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 8d497fa1b03c3..8d040f1df3f8b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -57,7 +57,7 @@ public void onMessage(PipelineActionsFromDatanode report,
pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- LOG.error("Received pipeline action {} for {} from datanode {}. " +
+ LOG.info("Received pipeline action {} for {} from datanode {}. " +
"Reason : {}", action.getAction(), pipeline,
report.getDatanodeDetails(),
action.getClosePipeline().getDetailedReason());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed606dfff..e41675dace724 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -52,6 +52,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
+ private final PipelineStateManager stateManager;
private final Configuration conf;
private final int heavyNodeCriteria;
@@ -59,15 +60,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
*
- * @param nodeManager Node Manager
+ * @param nodeManager NodeManager
+ * @param stateManager PipelineStateManager
* @param conf Configuration
*/
- public PipelinePlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
+ public PipelinePlacementPolicy(final NodeManager nodeManager,
+ final PipelineStateManager stateManager, final Configuration conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
- heavyNodeCriteria = conf.getInt(
+ this.stateManager = stateManager;
+ this.heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
@@ -76,11 +79,34 @@ public PipelinePlacementPolicy(
* Returns true if this node meets the criteria.
*
* @param datanodeDetails DatanodeDetails
+ * @param nodesRequired nodes required count
* @return true if we have enough space.
*/
@VisibleForTesting
- boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
- return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+ if (heavyNodeCriteria == 0) {
+ // no limit applied.
+ return true;
+ }
+ // Datanodes from pipeline in some states can also be considered available
+ // for pipeline allocation. Thus the number of these pipeline shall be
+ // deducted from total heaviness calculation.
+ int pipelineNumDeductable = (int)stateManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.valueOf(nodesRequired),
+ Pipeline.PipelineState.CLOSED)
+ .stream().filter(
+ p -> nodeManager.getPipelines(datanodeDetails).contains(p.getId()))
+ .count();
+ boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+ - pipelineNumDeductable) < heavyNodeCriteria;
+ if (!meet) {
+ LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+ "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+ nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+ heavyNodeCriteria);
+ }
+ return meet;
}
/**
@@ -102,18 +128,19 @@ List filterViableNodes(
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
+ int initialHealthyNodesCount = healthyNodes.size();
String msg;
- if (healthyNodes.size() == 0) {
+ if (initialHealthyNodesCount == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
- if (healthyNodes.size() < nodesRequired) {
+ if (initialHealthyNodesCount < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
- nodesRequired, healthyNodes.size());
+ nodesRequired, initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -121,14 +148,17 @@ List filterViableNodes(
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
- List healthyList = healthyNodes.stream().filter(d ->
- meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+ List healthyList = healthyNodes.stream()
+ .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+ .collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
- " Nodes required: %d Found: %d",
- heavyNodeCriteria, nodesRequired, healthyList.size());
+ " Nodes required: %d Found: %d, healthy nodes count in " +
+ "NodeManager: %d.",
+ heavyNodeCriteria, nodesRequired, healthyList.size(),
+ initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -155,12 +185,10 @@ public List chooseDatanodes(
List healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);
- // Randomly picks nodes when all nodes are equal.
+ // Randomly picks nodes when all nodes are equal or factor is ONE.
// This happens when network topology is absent or
// all nodes are on the same rack.
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
- LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
- "Required nodes: {}", nodesRequired);
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
@@ -188,8 +216,8 @@ public List getResultSet(
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
- LOG.error("Unable to find the first healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+ "that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +232,8 @@ public List getResultSet(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
- LOG.error("Unable to find nodes on different racks that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+ " that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +256,9 @@ public List getResultSet(
}
if (results.size() < nodesRequired) {
- LOG.error("Unable to find the required number of healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
- nodesRequired, results.size());
+ LOG.error("Pipeline Placement: Unable to find the required number of " +
+ "healthy nodes that meet the criteria. Required nodes: {}, " +
+ "Found nodes: {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378cd18359..8e0f32de15994 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
- pipelineMap = new HashMap<>();
- pipeline2container = new HashMap<>();
+ pipelineMap = new ConcurrentHashMap<>();
+ pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9409728d6b485..f6b80edcf96e8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,13 +20,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.io.MultipleIOException;
@@ -44,13 +43,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
@@ -69,6 +62,7 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final PipelinePlacementPolicy placementPolicy;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -92,65 +86,53 @@ public class RatisPipelineProvider implements PipelineProvider {
this.stateManager = stateManager;
this.conf = conf;
this.tlsConfig = tlsConfig;
+ this.placementPolicy =
+ new PipelinePlacementPolicy(nodeManager, stateManager, conf);
}
-
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
- private static PlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class extends PlacementPolicy> implClass =
- (Class extends PlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
-
- try {
- Constructor extends PlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
-// LOG.error("Unhandled exception occurred, Placement policy will not " +
-// "be functional.");
- throw new IllegalArgumentException("Unable to load " +
- "PlacementPolicy", e);
- }
- }
-
- @Override
- public Pipeline create(ReplicationFactor factor) throws IOException {
- // Get set of datanodes already used for ratis pipeline
+ private List pickNodesNeverUsed(ReplicationFactor factor)
+ throws SCMException {
Set dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
+ stateManager.getPipelines(ReplicationType.RATIS, factor)
+ .stream().filter(
+ p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+ p.getPipelineState().equals(PipelineState.DORMANT) ||
+ p.getPipelineState().equals(PipelineState.ALLOCATED))
.forEach(p -> dnsUsed.addAll(p.getNodes()));
// Get list of healthy nodes
- List dns =
- nodeManager.getNodes(NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
+ List dns = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY)
+ .parallelStream()
+ .filter(dn -> !dnsUsed.contains(dn))
+ .limit(factor.getNumber())
+ .collect(Collectors.toList());
if (dns.size() < factor.getNumber()) {
String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- factor.getNumber(), dns.size());
- throw new InsufficientDatanodesException(e);
+ .format("Cannot create pipeline of factor %d using %d nodes." +
+ " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+ dns.size(), dnsUsed.size(),
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+ throw new SCMException(e,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return dns;
+ }
+
+ @Override
+ public Pipeline create(ReplicationFactor factor) throws IOException {
+ List dns;
+
+ switch(factor) {
+ case ONE:
+ dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+ break;
+ case THREE:
+ dns = placementPolicy.chooseDatanodes(null,
+ null, factor.getNumber(), 0);
+ break;
+ default:
+ throw new IllegalStateException("Unknown factor: " + factor.name());
}
Pipeline pipeline = Pipeline.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 777a0b05aabde..21c4fbf4d3987 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -64,8 +64,8 @@ static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
try {
destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
} catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
- pipeline.getId(), dn);
+ LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+ pipeline.getId(), dn, e.getMessage());
}
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d4db297..80c934f8f6ea3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,10 +55,6 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
@@ -84,6 +81,8 @@ public class SCMPipelineManager implements PipelineManager {
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private GrpcTlsConfig grpcTlsConfig;
+ private int pipelineNumberLimit;
+ private int heavyNodeCriteria;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
@@ -97,8 +96,8 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
- int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+ ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
final File metaDir = ServerUtils.getScmDbDir(conf);
final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
this.pipelineStore =
@@ -115,6 +114,12 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
"SCMPipelineManagerInfo", this);
initializePipelineState();
this.grpcTlsConfig = grpcTlsConfig;
+ this.pipelineNumberLimit = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+ this.heavyNodeCriteria = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
public PipelineStateManager getStateManager() {
@@ -147,10 +152,33 @@ private void initializePipelineState() throws IOException {
}
}
+ private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+ if (heavyNodeCriteria > 0 && factor == ReplicationFactor.THREE) {
+ return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+ stateManager.getPipelines(ReplicationType.RATIS, factor,
+ Pipeline.PipelineState.CLOSED).size()) >= heavyNodeCriteria *
+ nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY);
+ }
+
+ if (pipelineNumberLimit > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS).size() -
+ stateManager.getPipelines(ReplicationType.RATIS,
+ Pipeline.PipelineState.CLOSED).size()) >= pipelineNumberLimit;
+ }
+
+ return false;
+ }
+
@Override
public synchronized Pipeline createPipeline(
ReplicationType type, ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
+ if (type == ReplicationType.RATIS && exceedPipelineNumberLimit(factor)) {
+ lock.writeLock().unlock();
+ throw new SCMException("Pipeline number meets the limit: " +
+ pipelineNumberLimit,
+ SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
+ }
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
@@ -160,10 +188,9 @@ public synchronized Pipeline createPipeline(
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
return pipeline;
- } catch (InsufficientDatanodesException idEx) {
- throw idEx;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
+ LOG.error("Pipeline creation failed.", ex);
throw ex;
} finally {
lock.writeLock().unlock();
@@ -172,7 +199,7 @@ public synchronized Pipeline createPipeline(
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
- List nodes) {
+ List nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6ef3be11..1b23036946eb2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -123,6 +123,14 @@ void incNumPipelineCreated() {
numPipelineCreated.incr();
}
+ /**
+ * Get the number of pipeline created.
+ * @return number of pipeline
+ */
+ long getNumPipelineCreated() {
+ return numPipelineCreated.value();
+ }
+
/**
* Increments number of failed pipeline creation count.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d760fa4d1..b3aac5ec804d9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule
private final PipelineManager pipelineManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Set processedDatanodeDetails =
+ private final Set processedPipelineIDs =
new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
@@ -116,46 +115,46 @@ protected void process(PipelineReportFromDatanode
// processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation.
Preconditions.checkNotNull(pipelineReportFromDatanode);
- DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
- if (!processedDatanodeDetails.contains(
- pipelineReportFromDatanode.getDatanodeDetails())) {
-
- Pipeline pipeline;
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
- // If the pipeline is open state mean, all 3 datanodes are reported
- // for this pipeline.
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- }
+
+ Pipeline pipeline;
+ PipelineReportsProto pipelineReport =
+ pipelineReportFromDatanode.getReport();
+
+ for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+ PipelineID pipelineID = PipelineID
+ .getFromProtobuf(report.getPipelineID());
+ if (processedPipelineIDs.contains(pipelineID)) {
+ continue;
+ }
+
+ try {
+ pipeline = pipelineManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ continue;
}
- if (scmInSafeMode()) {
- SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Healthy pipelines reported count is {}, " +
- "required healthy pipeline reported count is {}",
- currentHealthyPipelineCount, healthyPipelineThresholdCount);
+
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+ // If the pipeline is open state mean, all 3 datanodes are reported
+ // for this pipeline.
+ currentHealthyPipelineCount++;
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
}
- processedDatanodeDetails.add(dnDetails);
+ processedPipelineIDs.add(pipelineID);
}
+ if (scmInSafeMode()) {
+ SCMSafeModeManager.getLogger().info(
+ "SCM in safe mode. Healthy pipelines reported count is {}, " +
+ "required healthy pipeline reported count is {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount);
+ }
}
@Override
protected void cleanup() {
- processedDatanodeDetails.clear();
+ processedPipelineIDs.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54373f3e..20cc3cf358309 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -64,6 +64,8 @@
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test DeadNodeHandler.
*/
@@ -84,6 +86,7 @@ public void setup() throws IOException, AuthenticationException {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b179c646..1e340393c4768 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@
import java.util.*;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for PipelinePlacementPolicy.
*/
public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
+ private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
public void init() throws Exception {
nodeManager = new MockNodeManager(true,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
- placementPolicy =
- new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+ placementPolicy = new PipelinePlacementPolicy(
+ nodeManager, new PipelineStateManager(conf), conf);
}
@Test
@@ -123,7 +128,7 @@ private List overWriteLocationInNodes(
public void testHeavyNodeShouldBeExcluded() throws SCMException{
List healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
- int nodesRequired = healthyNodes.size()/2;
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
// only minority of healthy NODES are heavily engaged in pipelines.
int minorityHeavy = healthyNodes.size()/2 - 1;
List pickedNodes1 = placementPolicy.chooseDatanodes(
@@ -179,7 +184,9 @@ private void insertHeavyNodesIntoNodeManager(
}
int considerHeavyCount =
- ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+ conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
Node2PipelineMap mockMap = new Node2PipelineMap();
for (DatanodeDetails node : nodes) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559fd3a58..9bccb1acebbf0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public void testPipelineCloseWithPipelineAction() throws Exception {
new PipelineActionHandler(pipelineManager, conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
- Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+ Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 6ace90cb248ee..d0afbbebedc25 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -34,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
@@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy {
public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
- .setHbInterval(1000)
+ .setPipelineNumber(numDatanodes + numDatanodes/3)
+ .setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
@@ -103,7 +108,7 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
} catch (IOException ioe) {
// As now all datanodes are shutdown, they move to stale state, there
// will be no sufficient datanodes to create the pipeline.
- Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+ Assert.assertTrue(ioe instanceof SCMException);
}
// make sure pipelines is destroyed
@@ -116,9 +121,13 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
- // make sure pipelines is created after node start
- pipelineManager.triggerPipelineCreation();
- waitForPipelines(1);
+
+ if (cluster.getStorageContainerManager()
+ .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) > 0) {
+ // make sure pipelines is created after node start
+ pipelineManager.triggerPipelineCreation();
+ waitForPipelines(1);
+ }
}
private void waitForPipelines(int numPipelines)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4e654c5..7526575820ea8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.pipeline;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,6 +32,8 @@
import java.util.ArrayList;
import java.util.List;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for RatisPipelineProvider.
*/
@@ -46,14 +47,17 @@ public class TestRatisPipelineProvider {
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
provider = new MockRatisPipelineProvider(nodeManager,
- stateManager, new OzoneConfiguration());
+ stateManager, conf);
}
private void createPipelineAndAssertions(
HddsProtos.ReplicationFactor factor) throws IOException {
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -61,10 +65,7 @@ private void createPipelineAndAssertions(
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should not overlap with the previous created pipeline
- Assert.assertTrue(
- CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
- .isEmpty());
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -77,6 +78,7 @@ public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -86,11 +88,7 @@ public void testCreatePipelineWithFactor() throws IOException {
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should overlap with the previous created pipeline,
- // and one datanode should overlap between the two types.
- Assert.assertEquals(
- CollectionUtils.intersection(pipeline.getNodes(),
- pipeline1.getNodes()).size(), 1);
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -139,45 +137,37 @@ public void testCreatePipelineWithNodes() {
@Test
public void testCreatePipelinesDnExclude() throws IOException {
- // We have 10 DNs in MockNodeManager.
- // Use up first 3 DNs for an open pipeline.
- List openPiplineDns = nodeManager.getAllNodes()
- .subList(0, 3);
+ List allHealthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ int totalHealthyNodesCount = allHealthyNodes.size();
+
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
- Pipeline openPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(openPiplineDns)
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .build();
-
- stateManager.addPipeline(openPipeline);
-
- // Use up next 3 DNs also for an open pipeline.
- List moreOpenPiplineDns = nodeManager.getAllNodes()
- .subList(3, 6);
- Pipeline anotherOpenPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(moreOpenPiplineDns)
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .build();
- stateManager.addPipeline(anotherOpenPipeline);
-
- // Use up next 3 DNs also for a closed pipeline.
- List closedPiplineDns = nodeManager.getAllNodes()
- .subList(6, 9);
- Pipeline anotherClosedPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(closedPiplineDns)
- .setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .build();
- stateManager.addPipeline(anotherClosedPipeline);
+ List closePipelineDns = new ArrayList<>();
+ for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+ List pipelineDns = allHealthyNodes
+ .subList(3 * i, 3 * (i + 1));
+
+ Pipeline.PipelineState state;
+ if (i % 2 == 0) {
+ state = Pipeline.PipelineState.OPEN;
+ } else {
+ state = Pipeline.PipelineState.CLOSED;
+ closePipelineDns.addAll(pipelineDns);
+ }
+
+ Pipeline openPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(pipelineDns)
+ .setState(state)
+ .setId(PipelineID.randomId())
+ .build();
+
+
+ stateManager.addPipeline(openPipeline);
+ nodeManager.addPipeline(openPipeline);
+ }
Pipeline pipeline = provider.create(factor);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
@@ -187,15 +177,9 @@ public void testCreatePipelinesDnExclude() throws IOException {
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
List pipelineNodes = pipeline.getNodes();
- // Pipline nodes cannot be from open pipelines.
- Assert.assertTrue(
- pipelineNodes.parallelStream().filter(dn ->
- (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
- .count() == 0);
-
// Since we have only 10 DNs, at least 1 pipeline node should have been
// from the closed pipeline DN list.
Assert.assertTrue(pipelineNodes.parallelStream().filter(
- closedPiplineDns::contains).count() > 0);
+ closePipelineDns::contains).count() > 0);
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1224ed1..9d5996011bebc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -28,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -58,6 +60,7 @@ public class TestSCMPipelineManager {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,8 @@ public void testPipelineCreationFailedMetric() throws Exception {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail();
- } catch (InsufficientDatanodesException idEx) {
- Assert.assertEquals(
- "Cannot create pipeline of factor 3 using 1 nodes.",
- idEx.getMessage());
+ } catch (SCMException idEx) {
+ // pipeline creation failed this time.
}
metrics = getMetrics(
@@ -266,7 +267,7 @@ public void testPipelineCreationFailedMetric() throws Exception {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
- Assert.assertTrue(numPipelineCreateFailed == 0);
+ Assert.assertTrue(numPipelineCreateFailed == 1);
}
@Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67ae882a8..1af2f74ee453f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ int numOfNodes = 4;
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(4)
+ .setNumDatanodes(numOfNodes)
+ // allow only one FACTOR THREE pipeline.
+ .setPipelineNumber(numOfNodes + 1)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555a509eb..1caa302c1540e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.junit.Assert.fail;
/**
@@ -62,8 +63,11 @@ public void setup(int numDatanodes) throws Exception {
true);
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1000);
+
clusterBuilder = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
+ .setPipelineNumber(numDatanodes + numDatanodes/3)
.setHbInterval(1000)
.setHbProcessorInterval(1000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0aba9689ffbeb..4fe1701f010a0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -266,6 +266,7 @@ abstract class Builder {
protected int numOfDatanodes = 1;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
+ protected int pipelineNumber = 3;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@@ -352,6 +353,16 @@ public Builder setNumDatanodes(int val) {
return this;
}
+ /**
+ * Sets the total number of pipelines to create.
+ * @param val number of pipelines
+ * @return MiniOzoneCluster.Builder
+ */
+ public Builder setPipelineNumber(int val) {
+ pipelineNumber = val;
+ return this;
+ }
+
/**
* Sets the number of HeartBeat Interval of Datanodes, the value should be
* in MilliSeconds.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482bd908a..39b2582bee9d3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -494,6 +494,9 @@ void initializeConfiguration() throws IOException {
streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
+ // MiniOzoneCluster should have global pipeline upper limit.
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ pipelineNumber == 3 ? 2 * numOfDatanodes : pipelineNumber);
configureTrace();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index cf570d28f7c14..ea648c95d97e2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 8649837a0cd0e..908849749b339 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -89,8 +90,10 @@ public void init() throws Exception {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
- .setBlockSize(blockSize).setChunkSize(chunkSize)
+ .setPipelineNumber(10).setBlockSize(blockSize).setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES).build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index ea51900971887..ff9fad492c1ab 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@ public static void init() throws Exception {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 0886d26fe64b9..865e0b52a7a0e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -54,8 +54,7 @@
import java.util.function.Predicate;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests delete key operation with a slow follower in the datanode
@@ -99,10 +98,12 @@ public static void init() throws Exception {
1000, TimeUnit.SECONDS);
conf.setLong("hdds.scm.replication.thread.interval",
containerReportInterval);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+ .setPipelineNumber(6).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 19a1707973153..37b8a5ff40b5c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@ public static void init() throws Exception {
baseDir.mkdirs();
conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
- // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624fbf55f..00556a8a00637 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower {
private static String bucketName;
private static String path;
private static XceiverClientManager xceiverClientManager;
+ private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
/**
* Create a MiniDFSCluster for testing.
@@ -111,10 +112,12 @@ public static void init() throws Exception {
1000, TimeUnit.SECONDS);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
-
conf.setQuietMode(false);
- cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+ int numOfDatanodes = 3;
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(numOfDatanodes)
+ .setPipelineNumber(numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+ .setHbInterval(100)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -176,7 +179,7 @@ public void testDeleteKeyWithSlowFollower() throws Exception {
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- Assert.assertTrue(pipelineList.size() == 1);
+ Assert.assertEquals(FACTOR_THREE_PIPELINE_COUNT, pipelineList.size());
Pipeline pipeline = pipelineList.get(0);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index edb796b8799d4..03683237e691d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,7 @@ private void init() throws Exception {
1, TimeUnit.SECONDS);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -105,7 +107,7 @@ private void init() throws Exception {
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(10).build();
+ .setNumDatanodes(10).setPipelineNumber(15).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e85ca23..84649e3458fe3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+ .setPipelineNumber(5).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index fa8a289ea810a..666264cb162da 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -81,6 +81,7 @@ public static void init() throws Exception {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setPipelineNumber(5)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 96662471a3eda..6dbae6ae9d9d5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -47,8 +47,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -87,10 +86,13 @@ private void startCluster(int datanodes) throws Exception {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(datanodes).build();
+ .setNumDatanodes(datanodes)
+ .setPipelineNumber(0)
+ .build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5f6d494a24b19..9e7e3c07bb5bb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -91,6 +91,7 @@ public void init() throws Exception {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index d91f739332ada..4710adaca48bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -164,6 +164,7 @@ public abstract class TestOzoneRpcClientAbstract {
static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setPipelineNumber(10)
.setScmId(scmId)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 9b59349119451..fa89f5b7f5b30 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -53,8 +53,7 @@
import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* This class verifies the watchForCommit Handling by xceiverClient.
@@ -92,10 +91,12 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c967664..763f6395718e7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test container closing.
*/
@@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
+ .setPipelineNumber(15)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807aedb8a..191589a38cbb4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@ public void setup() throws Exception {
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(4)
+ .setPipelineNumber(10)
.build();
cluster.waitForClusterToBeReady();
metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index c9b8c89e04dac..618212a987382 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -77,9 +78,11 @@ public void setUp() throws Exception {
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
+ .setPipelineNumber(numOfDatanodes + numOfDatanodes/2)
.build();
cluster.waitForClusterToBeReady();
scmClient = new ContainerOperationClient(cluster
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index ab351913d7a01..bd7173c4afab5 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -99,6 +99,7 @@ public void init() throws Exception {
conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
OMStorage omStore = new OMStorage(conf);
omStore.setClusterId(clusterId);
@@ -108,6 +109,8 @@ public void init() throws Exception {
// Start the cluster
cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setNumDatanodes(7)
+ .setPipelineNumber(10)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822bf4d43..eb19fe77de080 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@ public abstract class TestDataValidate {
static void startCluster(OzoneConfiguration conf) throws Exception {
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(5).build();
+ .setNumDatanodes(5).setPipelineNumber(8).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab60226db..cc922f240e54a 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@ public static void init() throws Exception {
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
+ .setPipelineNumber(8)
.build();
cluster.waitForClusterToBeReady();
}