Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
Expand All @@ -34,6 +35,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

/**
* Class defining Chill mode exit criteria for Pipelines.
*
Expand All @@ -45,12 +49,14 @@ public class HealthyPipelineChillModeRule
implements ChillModeExitRule<PipelineReportFromDatanode>,
EventHandler<PipelineReportFromDatanode> {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
private final PipelineManager pipelineManager;
private final SCMChillModeManager chillModeManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
private final Set<DatanodeDetails> processedDatanodeDetails =
new HashSet<>();

HealthyPipelineChillModeRule(PipelineManager pipelineManager,
SCMChillModeManager manager, Configuration configuration) {
Expand All @@ -71,7 +77,7 @@ public class HealthyPipelineChillModeRule
// On a fresh installed cluster, there will be zero pipelines in the SCM
// pipeline DB.
healthyPipelineThresholdCount =
(int) Math.ceil((healthyPipelinesPercent / 100) * pipelineCount);
(int) Math.ceil(healthyPipelinesPercent * pipelineCount);

LOG.info(" Total pipeline count is {}, healthy pipeline " +
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
Expand Down Expand Up @@ -101,7 +107,8 @@ public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
continue;
}

if (pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
// If the pipeline is open state mean, all 3 datanodes are reported
// for this pipeline.
currentHealthyPipelineCount++;
Expand All @@ -125,14 +132,26 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
return;
}

// Process pipeline report from datanode
process(pipelineReportFromDatanode);

if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount);
// When SCM is in chill mode for long time, already registered
// datanode can send pipeline report again, then pipeline handler fires
// processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation.
DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
if (!processedDatanodeDetails.contains(
pipelineReportFromDatanode.getDatanodeDetails())) {

// Process pipeline report from datanode
process(pipelineReportFromDatanode);

if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount);
}

processedDatanodeDetails.add(dnDetails);
}

if (validate()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -153,6 +154,82 @@ public void testHealthyPipelineChillModeRuleWithPipelines() throws Exception {
}


@Test
public void testHealthyPipelineChillModeRuleWithMixedPipelines()
throws Exception {

String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineChillModeRule.class.getName() + UUID.randomUUID());

try {
EventQueue eventQueue = new EventQueue();
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));

OzoneConfiguration config = new OzoneConfiguration();

// In Mock Node Manager, first 8 nodes are healthy, next 2 nodes are
// stale and last one is dead, and this repeats. So for a 12 node, 9
// healthy, 2 stale and one dead.
MockNodeManager nodeManager = new MockNodeManager(true, 12);
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);


PipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue);

// Create 3 pipelines
Pipeline pipeline1 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);
Pipeline pipeline2 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);


SCMChillModeManager scmChillModeManager = new SCMChillModeManager(
config, containers, pipelineManager, eventQueue);

HealthyPipelineChillModeRule healthyPipelineChillModeRule =
scmChillModeManager.getHealthyPipelineChillModeRule();


// No datanodes have sent pipelinereport from datanode
Assert.assertFalse(healthyPipelineChillModeRule.validate());


GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
SCMChillModeManager.class));

// fire event with pipeline report with ratis type and factor 1
// pipeline, validate() should return false
firePipelineEvent(pipeline1, eventQueue);

GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
"reported count is 0"),
1000, 5000);
Assert.assertFalse(healthyPipelineChillModeRule.validate());

firePipelineEvent(pipeline2, eventQueue);
firePipelineEvent(pipeline3, eventQueue);

GenericTestUtils.waitFor(() -> healthyPipelineChillModeRule.validate(),
1000, 5000);

} finally {
FileUtil.fullyDelete(new File(storageDir));
}

}


private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,23 @@ public void testChillModePipelineExitRule() throws Exception {
String storageDir = GenericTestUtils.getTempPath(
TestSCMChillModeManager.class.getName() + UUID.randomUUID());
try{
MockNodeManager nodeManager = new MockNodeManager(true, 1);
MockNodeManager nodeManager = new MockNodeManager(true, 3);
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);

PipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, queue);

Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipeline.getId().getProtobuf()));

scmChillModeManager = new SCMChillModeManager(
config, containers, pipelineManager, queue);
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
Expand All @@ -254,17 +263,10 @@ public void testChillModePipelineExitRule() throws Exception {
HddsTestUtils.createNodeRegistrationContainerReport(containers));
assertTrue(scmChillModeManager.getInChillMode());

// simulation a pipeline report to trigger the rule check
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE);
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipeline.getId().getProtobuf()));

queue.fireEvent(SCMEvents.PIPELINE_REPORT, new PipelineReportFromDatanode(
pipeline.getNodes().get(0), reportBuilder.build()));
// Trigger the processed pipeline report event
queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new PipelineReportFromDatanode(pipeline.getNodes().get(0),
reportBuilder.build()));

GenericTestUtils.waitFor(() -> {
return !scmChillModeManager.getInChillMode();
Expand Down