diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 007104f7563b9..c2657718058d6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -89,6 +89,11 @@ public final class HddsConfigKeys { public static final double HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10; + public static final String HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT = + "hdds.scm.chillmode.atleast.one.node.reported.pipeline.pct"; + public static final double + HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT = 0.90; + public static final String HDDS_LOCK_MAX_CONCURRENCY = "hdds.lock.max.concurrency"; public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b114daa2999b3..faf2f8900446c 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1325,6 +1325,16 @@ + + hdds.scm.chillmode.atleast.one.node.reported.pipeline.pct + 0.90 + HDDS,SCM,OPERATION + + Percentage of pipelines, where at least one datanode is reported in the + pipeline. + + + hdds.container.action.max.limit 20 diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java new file mode 100644 index 0000000000000..20b35b8b112ac --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.chillmode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher. + PipelineReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * This rule covers whether we have atleast one datanode is reported for each + * pipeline. This rule is for all open containers, we have at least one + * replica available for read when we exit chill mode. + */ +public class OneReplicaPipelineChillModeRule implements + ChillModeExitRule, + EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class); + + private int thresholdCount; + private Set reportedPipelineIDSet = new HashSet<>(); + private final PipelineManager pipelineManager; + private final SCMChillModeManager chillModeManager; + + public OneReplicaPipelineChillModeRule(PipelineManager pipelineManager, + SCMChillModeManager chillModeManager, + Configuration configuration) { + this.chillModeManager = chillModeManager; + this.pipelineManager = pipelineManager; + + double percent = + configuration.getDouble( + HddsConfigKeys.HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT, + HddsConfigKeys. + HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT); + + int totalPipelineCount = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE).size(); + + thresholdCount = (int) Math.ceil(percent * totalPipelineCount); + + LOG.info(" Total pipeline count is {}, pipeline's with atleast one " + + "datanode reported threshold count is {}", totalPipelineCount, + thresholdCount); + + } + @Override + public boolean validate() { + if (reportedPipelineIDSet.size() >= thresholdCount) { + return true; + } + return false; + } + + @Override + public void process(PipelineReportFromDatanode pipelineReportFromDatanode) { + Pipeline pipeline; + Preconditions.checkNotNull(pipelineReportFromDatanode); + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + PipelineID pipelineID = PipelineID + .getFromProtobuf(report.getPipelineID()); + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + continue; + } + + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && + !reportedPipelineIDSet.contains(pipelineID)) { + reportedPipelineIDSet.add(pipelineID); + } + } + } + + @Override + public void cleanup() { + reportedPipelineIDSet.clear(); + } + + @Override + public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, + EventPublisher publisher) { + + if (validate()) { + chillModeManager.validateChillModeExitRules(publisher); + return; + } + + // Process pipeline report from datanode + process(pipelineReportFromDatanode); + + if (chillModeManager.getInChillMode()) { + SCMChillModeManager.getLogger().info( + "SCM in chill mode. Pipelines with atleast one datanode reported " + + "count is {}, required atleast one datanode reported per " + + "pipeline count is {}", + reportedPipelineIDSet.size(), thresholdCount); + } + + if (validate()) { + chillModeManager.validateChillModeExitRules(publisher); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java index ba79af729c647..1d97a579c8be2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java @@ -62,6 +62,8 @@ public class SCMChillModeManager implements private static final String DN_EXIT_RULE = "DataNodeChillModeRule"; private static final String HEALTHY_PIPELINE_EXIT_RULE = "HealthyPipelineChillModeRule"; + private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE = + "AtleastOneDatanodeReportedRule"; private final EventQueue eventPublisher; private final PipelineManager pipelineManager; @@ -86,8 +88,14 @@ public SCMChillModeManager(Configuration conf, && pipelineManager != null) { HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule( pipelineManager, this, config); + OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule = + new OneReplicaPipelineChillModeRule(pipelineManager, this, conf); exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule); + exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, + oneReplicaPipelineChillModeRule); eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule); + eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, + oneReplicaPipelineChillModeRule); } emitChillModeStatus(); } else { @@ -179,4 +187,10 @@ public HealthyPipelineChillModeRule getHealthyPipelineChillModeRule() { exitRules.get(HEALTHY_PIPELINE_EXIT_RULE); } + @VisibleForTesting + public OneReplicaPipelineChillModeRule getOneReplicaPipelineChillModeRule() { + return (OneReplicaPipelineChillModeRule) + exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE); + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestOneReplicaPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestOneReplicaPipelineChillModeRule.java new file mode 100644 index 0000000000000..f389a8333d836 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestOneReplicaPipelineChillModeRule.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.chillmode; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class tests OneReplicaPipelineChillModeRule. + */ +public class TestOneReplicaPipelineChillModeRule { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private OneReplicaPipelineChillModeRule rule; + private PipelineManager pipelineManager; + private EventQueue eventQueue; + + + private void setup(int nodes, int pipelineFactorThreeCount, + int pipelineFactorOneCount) throws Exception { + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.setBoolean( + HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true); + ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS, + folder.newFolder().toString()); + + List containers = new ArrayList<>(); + containers.addAll(HddsTestUtils.getContainerInfo(1)); + MockNodeManager mockNodeManager = new MockNodeManager(true, nodes); + + eventQueue = new EventQueue(); + pipelineManager = + new SCMPipelineManager(ozoneConfiguration, mockNodeManager, + eventQueue); + + createPipelines(pipelineFactorThreeCount, + HddsProtos.ReplicationFactor.THREE); + createPipelines(pipelineFactorOneCount, + HddsProtos.ReplicationFactor.ONE); + + SCMChillModeManager scmChillModeManager = + new SCMChillModeManager(ozoneConfiguration, containers, + pipelineManager, eventQueue); + + rule = scmChillModeManager.getOneReplicaPipelineChillModeRule(); + } + + @Test + public void testOneReplicaPipelineRule() throws Exception { + + // As with 30 nodes, We can create 7 pipelines with replication factor 3. + // (This is because in node manager for every 10 nodes, 7 nodes are + // healthy, 2 are stale one is dead.) + int nodes = 30; + int pipelineFactorThreeCount = 7; + int pipelineCountOne = 0; + setup(nodes, pipelineFactorThreeCount, pipelineCountOne); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs( + LoggerFactory.getLogger(SCMChillModeManager.class)); + + List pipelines = pipelineManager.getPipelines(); + for (int i = 0; i < pipelineFactorThreeCount -1; i++) { + firePipelineEvent(pipelines.get(i)); + } + + // As 90% of 7 with ceil is 7, if we send 6 pipeline reports, rule + // validate should be still false. + + GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( + "reported count is 6"), 1000, 5000); + + Assert.assertFalse(rule.validate()); + + //Fire last pipeline event from datanode. + firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1)); + + GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); + + } + + + @Test + public void testOneReplicaPipelineRuleMixedPipelines() throws Exception { + + // As with 30 nodes, We can create 7 pipelines with replication factor 3. + // (This is because in node manager for every 10 nodes, 7 nodes are + // healthy, 2 are stale one is dead.) + int nodes = 30; + int pipelineCountThree = 7; + int pipelineCountOne = 21; + + setup(nodes, pipelineCountThree, pipelineCountOne); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs( + LoggerFactory.getLogger(SCMChillModeManager.class)); + + List pipelines = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE); + for (int i = 0; i < pipelineCountOne; i++) { + firePipelineEvent(pipelines.get(i)); + } + + GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( + "reported count is 0"), 1000, 5000); + + // fired events for one node ratis pipeline, so we will be still false. + Assert.assertFalse(rule.validate()); + + pipelines = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + for (int i = 0; i < pipelineCountThree - 1; i++) { + firePipelineEvent(pipelines.get(i)); + } + + GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( + "reported count is 6"), 1000, 5000); + + //Fire last pipeline event from datanode. + firePipelineEvent(pipelines.get(pipelineCountThree - 1)); + + GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); + + } + + + + private void createPipelines(int count, + HddsProtos.ReplicationFactor factor) throws Exception { + for (int i = 0; i < count; i++) { + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + factor); + } + } + + private void firePipelineEvent(Pipeline pipeline) { + PipelineReportsProto.Builder reportBuilder = + PipelineReportsProto.newBuilder(); + + reportBuilder.addPipelineReport(PipelineReport.newBuilder() + .setPipelineID(pipeline.getId().getProtobuf())); + + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) { + eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, + new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( + pipeline.getNodes().get(0), reportBuilder.build())); + eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, + new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( + pipeline.getNodes().get(1), reportBuilder.build())); + eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, + new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( + pipeline.getNodes().get(2), reportBuilder.build())); + } else { + eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, + new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( + pipeline.getNodes().get(0), reportBuilder.build())); + } + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/package-info.java new file mode 100644 index 0000000000000..22a522a11c9e8 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.chillmode; +/** + * SCM Chill mode tests. + */