From 6675d6f7d8e9ec22611378770ac5b7be0075c2bb Mon Sep 17 00:00:00 2001 From: cndaimin Date: Thu, 17 Mar 2022 14:32:38 +0800 Subject: [PATCH] Fix EC decommission when rack is not enough --- .../BlockPlacementPolicyDefault.java | 7 +- ...BlockPlacementPolicyRackFaultTolerant.java | 2 +- .../BlockPlacementStatusDefault.java | 9 +- .../blockmanagement/DatanodeAdminManager.java | 2 + .../blockmanagement/DatanodeManager.java | 34 ++++ .../blockmanagement/ErasureCodingWork.java | 40 +++-- .../blockmanagement/FSClusterStats.java | 6 + .../blockmanagement/HeartbeatManager.java | 1 + .../hdfs/server/namenode/NamenodeFsck.java | 4 +- .../hdfs/TestErasureCodingNotEnoughRacks.java | 155 ++++++++++++++++++ 10 files changed, 242 insertions(+), 18 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingNotEnoughRacks.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index dec98d85b52df..bfd3dacf21335 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -1173,7 +1173,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, .map(dn -> dn.getNetworkLocation()).distinct().count(); return new BlockPlacementStatusDefault(Math.toIntExact(rackCount), - minRacks, clusterMap.getNumOfRacks()); + minRacks, clusterMap.getNumOfRacks() - stats.getNumOfExcludedRacks()); } /** @@ -1369,5 +1369,10 @@ public void setExcludeSlowNodesEnabled(boolean enable) { public boolean getExcludeSlowNodesEnabled() { return excludeSlowNodesEnabled; } + + protected FSClusterStats getStats() { + return stats; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index dad877fdc76fe..401a35167de0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -243,7 +243,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, racks.add(dn.getNetworkLocation()); } return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, - clusterMap.getNumOfRacks()); + clusterMap.getNumOfRacks() - getStats().getNumOfExcludedRacks()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java index 761214234c5a5..a8b3e3918e5df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java @@ -50,7 +50,14 @@ public int getAdditionalReplicasRequired() { if (isPlacementPolicySatisfied()) { return 0; } else { - return requiredRacks - currentRacks; + // As 'requiredRacks - currentRacks' accounts for racks still needed, + // while 'totalRacks - currentRacks' accounts for racks can extra provided. + // When either racks still needed or racks can extra provided <= 0, + // it would be considered as placement satisfied, see + // 'BlockPlacementStatusDefault#isPlacementPolicySatisfied'. + // So as the method name, the additional replicas required should be + // the minimum value of them. + return Math.min(requiredRacks - currentRacks, totalRacks - currentRacks); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 42b6ddd8c78fb..fa075b543f2fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -177,6 +177,7 @@ public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { // Update DN stats maintained by HeartbeatManager hbManager.startDecommission(node); + blockManager.getDatanodeManager().refreshDecommissionRacks(); // hbManager.startDecommission will set dead node to decommissioned. if (node.isDecommissionInProgress()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { @@ -201,6 +202,7 @@ public void stopDecommission(DatanodeDescriptor node) { if (node.isDecommissionInProgress() || node.isDecommissioned()) { // Update DN stats maintained by HeartbeatManager hbManager.stopDecommission(node); + blockManager.getDatanodeManager().refreshDecommissionRacks(); // extra redundancy blocks will be detected and processed when // the dead node comes back and send in its full block report. if (node.isAlive()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cb601e94f822c..6197d47a72f25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -178,6 +178,11 @@ public class DatanodeManager { */ private boolean hasClusterEverBeenMultiRack = false; + /** + * Number of racks of which all nodes are excluded: in decommission or decommissioned. + */ + private int numOfExcludedRacks = 0; + private final boolean checkIpHostnameInRegistration; /** * Whether we should tell datanodes what to cache in replies to @@ -1300,6 +1305,29 @@ public void refreshNodes(final Configuration conf) throws IOException { } } + /** + * Refresh the number of decommission racks: + * all nodes in which are either in DECOMMISSION_INPROGRESS or DECOMMISSIONED. + */ + public void refreshDecommissionRacks() { + List copy; + synchronized (this) { + copy = new ArrayList<>(datanodeMap.values()); + } + Set decommissionRacks = new HashSet<>(); + for (DatanodeDescriptor dn : copy) { + if (dn.isDecommissionInProgress() || dn.isDecommissioned()) { + decommissionRacks.add(dn.getNetworkLocation()); + } + } + for (DatanodeDescriptor dn : copy) { + if (!dn.isDecommissionInProgress() && !dn.isDecommissioned()) { + decommissionRacks.remove(dn.getNetworkLocation()); + } + } + this.numOfExcludedRacks = decommissionRacks.size(); + } + /** Reread include/exclude files. */ private void refreshHostsReader(Configuration conf) throws IOException { // Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames. @@ -2060,6 +2088,12 @@ public double getInServiceXceiverAverage() { public Map getStorageTypeStats() { return heartbeatManager.getStorageTypeStats(); } + + @Override + public int getNumOfExcludedRacks() { + return numOfExcludedRacks; + } + }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 6158677654b94..38c2e08beac0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -23,6 +23,7 @@ import org.apache.hadoop.net.Node; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.HashMap; import java.util.List; @@ -130,19 +131,32 @@ void addTaskToDatanode(NumberReplicas numberReplicas) { assert targets.length > 0; BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock(); - if (hasNotEnoughRack()) { - // if we already have all the internal blocks, but not enough racks, - // we only need to replicate one internal block to a new rack - int sourceIndex = chooseSource4SimpleReplication(); - createReplicationWork(sourceIndex, targets[0]); - } else if ((numberReplicas.decommissioning() > 0 || - numberReplicas.liveEnteringMaintenanceReplicas() > 0) && - hasAllInternalBlocks()) { - List leavingServiceSources = findLeavingServiceSources(); - // decommissioningSources.size() should be >= targets.length - final int num = Math.min(leavingServiceSources.size(), targets.length); - for (int i = 0; i < num; i++) { - createReplicationWork(leavingServiceSources.get(i), targets[i]); + if (hasAllInternalBlocks()) { + // We should take care of decommission first, specially when rack is not enough. + if (numberReplicas.decommissioning() > 0 || + numberReplicas.liveEnteringMaintenanceReplicas() > 0) { + List leavingServiceSources = findLeavingServiceSources(); + // decommissioningSources.size() should be >= targets.length + final int num = Math.min(leavingServiceSources.size(), targets.length); + for (int i = 0; i < num; i++) { + createReplicationWork(leavingServiceSources.get(i), targets[i]); + } + } else if (hasNotEnoughRack()) { + // if we already have all the internal blocks, but not enough racks, + // we only need to replicate one internal block to a new rack + int sourceIndex = chooseSource4SimpleReplication(); + createReplicationWork(sourceIndex, targets[0]); + } else { + // In fact it will not reach here, we dump related information in case of bug or what else. + LOG.warn("Unknown reconstruction work" + + ", block: " + getBlock() + + ", srcNodes: " + Arrays.toString(getSrcNodes()) + + ", liveReplicaStorages: " + getLiveReplicaStorages() + + ", additionalReplRequired: " + getAdditionalReplRequired() + + ", targets: " + Arrays.toString(targets) + + ", notEnoughRack: " + hasNotEnoughRack() + + ", liveBlockIndicies: " + Arrays.toString(liveBlockIndices) + + ", liveBusyBlockIndicies: " + Arrays.toString(liveBusyBlockIndices)); } } else { targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java index 14122952bb18d..f98e99f9148a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java @@ -66,4 +66,10 @@ public interface FSClusterStats { * @return storage statistics per storage type. */ Map getStorageTypeStats(); + + /** + * Number of racks of which all nodes are excluded: in decommission or decommissioned. + * @return Number of excluded racks. + */ + int getNumOfExcludedRacks(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 372cb237ca1e9..7930cf9548c7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -544,6 +544,7 @@ public void run() { } catch (Exception e) { LOG.error("Exception while checking heartbeat", e); } + blockManager.getDatanodeManager().refreshDecommissionRacks(); try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException ignored) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 1e8517edecad1..629d00c17bcff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -221,8 +221,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { this.out = out; this.totalDatanodes = totalDatanodes; this.remoteAddress = remoteAddress; - this.bpPolicies = new BlockPlacementPolicies(conf, null, - networktopology, + this.bpPolicies = new BlockPlacementPolicies(conf, + this.blockManager.getDatanodeManager().getFSClusterStats(), networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); this.staleInterval = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingNotEnoughRacks.java new file mode 100644 index 0000000000000..6151817d374e7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingNotEnoughRacks.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY; + +/** + * Test erasure coding when racks not enough. + */ +public class TestErasureCodingNotEnoughRacks { + public static final Logger LOG = + LoggerFactory.getLogger(TestErasureCodingNotEnoughRacks.class); + + private MiniDFSCluster cluster; + private ErasureCodingPolicy ecPolicy; + private DistributedFileSystem dfs; + private int blockSize; + private final int fileSize = 10 * 1024 * 1024; // 10 MiB. + + @Before + public void setup() throws Exception { + ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); // RS_6_3 + blockSize = ecPolicy.getCellSize() * 2; + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false); + cluster = DFSTestUtil.setupCluster(conf, 12, 6, 0); + dfs = cluster.getFileSystem(); + dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName()); + } + + @Test(timeout = 30000) + public void testBasic() throws Exception { + Path ecFile = new Path("/foo1"); + writeFile(ecFile); + + StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString()); + checkFile(ecFile); + } + + @Test(timeout = 30000) + public void testDecommissionOneNode() throws Exception { + Path ecFile = new Path("/foo2"); + writeFile(ecFile); + + BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + + Map> racksMap = getRacksMap(datanodeManager.getDatanodes()); + DatanodeDescriptor node = racksMap.values().iterator().next().get(0); + + datanodeManager.getDatanodeAdminManager().startDecommission(node); + waitNodeDecommissioned(node); + + checkFile(ecFile); + } + + @Test(timeout = 30000) + public void testDecommissionOneRack() throws Exception { + Path ecFile = new Path("/foo3"); + writeFile(ecFile); + + BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + + Map> racksMap = getRacksMap(datanodeManager.getDatanodes()); + List nodes = racksMap.values().iterator().next(); + + for (DatanodeDescriptor node : nodes) { + datanodeManager.getDatanodeAdminManager().startDecommission(node); + } + + for (DatanodeDescriptor node : nodes) { + waitNodeDecommissioned(node); + } + + checkFile(ecFile); + } + + @After + public void teardown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + private Map> getRacksMap(Collection nodes) { + Map> racksMap = new HashMap<>(); + for (DatanodeDescriptor dn : nodes) { + racksMap.computeIfAbsent(dn.getNetworkLocation(), k -> new ArrayList<>()).add(dn); + } + return racksMap; + } + + private void writeFile(Path ecFile) throws IOException { + byte[] bytes = StripedFileTestUtil.generateBytes(fileSize); + DFSTestUtil.writeFile(dfs, ecFile, new String(bytes)); + } + + private void checkFile(Path ecFile) throws Exception { + StripedFileTestUtil.checkData(dfs, ecFile, fileSize, new ArrayList<>(), + null, ecPolicy.getNumDataUnits() * blockSize); + } + + private void waitNodeDecommissioned(DatanodeInfo node) { + DatanodeInfo.AdminStates state = DatanodeInfo.AdminStates.DECOMMISSIONED; + boolean done = (state == node.getAdminState()); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // nothing + } + done = (state == node.getAdminState()); + } + } + +}