From 7ade5bedb727e24d51a17a485c7113e4785d7b8c Mon Sep 17 00:00:00 2001 From: "tom03.li" Date: Sat, 3 Aug 2024 18:13:09 +0800 Subject: [PATCH] HDFS-17599. Fix the mismatch between locations and indices for mover --- .../hdfs/server/balancer/Dispatcher.java | 8 +- .../hadoop/hdfs/server/mover/Mover.java | 23 +++ .../hadoop/hdfs/server/mover/TestMover.java | 146 ++++++++++++++++++ 3 files changed, 175 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 6ad0e4d22a854..acac65d774505 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -539,6 +539,10 @@ public void setIndices(byte[] indices) { this.indices = indices; } + public byte[] getIndices() { + return this.indices; + } + /** * Adjust EC block indices,it will remove the element of adjustList from indices. * @param adjustList the list will be removed from indices @@ -889,8 +893,8 @@ private long getBlockList() throws IOException, IllegalArgumentException { if (g != null) { // not unknown block.addLocation(g); } else if (blkLocs instanceof StripedBlockWithLocations) { - // some datanode may not in storageGroupMap due to decommission operation - // or balancer cli with "-exclude" parameter + // some datanode may not in storageGroupMap due to decommission or maintenance + // operation or balancer cli with "-exclude" parameter adjustList.add(i); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 63fe238cd5e07..87660c5db283e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.mover; import org.apache.commons.cli.*; +import org.apache.commons.cli.Options; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -49,6 +50,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -228,6 +230,27 @@ DBlock newDBlock(LocatedBlock lb, List locations, db.addLocation(source); } } + + List adjustList = new ArrayList<>(); + for (int i = 0; i < locations.size(); i++) { + MLocation ml = locations.get(i); + StorageGroup source = storages.getSource(ml); + if (source != null) { + db.addLocation(source); + } else if (lb.isStriped()) { + // some datanode may not in storages due to decommission or maintenance operation + // or balancer cli with "-exclude" parameter + adjustList.add(i); + } + } + + if (!adjustList.isEmpty()) { + // block.locations mismatch with block.indices + // adjust indices to get correct internalBlock + ((DBlockStriped) db).adjustIndices(adjustList); + Preconditions.checkArgument(((DBlockStriped) db).getIndices().length + == db.getLocations().size()); + } return db; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 90353c352ea41..dd6523ed75b61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY; @@ -73,6 +74,7 @@ import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -82,10 +84,13 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -98,6 +103,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; +import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; @@ -1005,6 +1011,146 @@ public void testMoverWithStripedFile() throws Exception { } } + @Test(timeout = 300000) + public void testMoverWithStripedFileMaintenance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf); + + // Start 9 datanodes + int numOfDatanodes = 9; + int storagesPerDatanode = 2; + long capacity = 9 * defaultBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for(int j = 0; j < storagesPerDatanode; j++){ + capacities[i][j] = capacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}}) + .storageCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + cluster.getFileSystem().enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + // Set "/bar" directory with ALL_SSD storage policy. + client.setStoragePolicy(barDir, "ALL_SSD"); + // Set an EC policy on "/bar" directory + client.setErasureCodingPolicy(barDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); + + // Write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = 6 * defaultBlockSize; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen,(short) 3, 0); + + // Verify storage types and locations + LocatedBlocks locatedBlocks = + client.getBlockLocations(fooFile, 0, fileLen); + DatanodeInfoWithStorage location = null; + for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){ + location = lb.getLocations()[8]; + for(StorageType type : lb.getStorageTypes()){ + Assert.assertEquals(StorageType.SSD, type); + } + } + + // Maintain the last datanode later + FSNamesystem ns = cluster.getNamesystem(0); + DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager(); + DatanodeDescriptor dn = datanodeManager.getDatanode(location.getDatanodeUuid()); + + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // Start 5 more datanodes for mover + capacities = new long[5][storagesPerDatanode]; + for (int i = 0; i < 5; i++) { + for(int j = 0; j < storagesPerDatanode; j++){ + capacities[i][j] = capacity; + } + } + cluster.startDataNodes(conf, 5, + new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}, + true, null, null, null, capacities, + null, false, false, false, null, null, null); + cluster.triggerHeartbeats(); + + // Move blocks to DISK + client.setStoragePolicy(barDir, "HOT"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] { "-p", barDir }); + + // Maintain a datanode that simulates that one node in the location list + // is in ENTERING_MAINTENANCE status. + datanodeManager.getDatanode(dn.getDatanodeUuid()).startMaintenance(); + waitNodeState(dn, DatanodeInfo.AdminStates.ENTERING_MAINTENANCE); + + // Move blocks back to SSD. + // Without HDFS-17599, locations and indices lengths might not match, + // resulting in getting the wrong blockId in DBlockStriped#getInternalBlock, + // and mover will fail to run. + client.setStoragePolicy(barDir, "ALL_SSD"); + rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] { "-p", barDir }); + + Assert.assertEquals("Movement to HOT should be successful", 0, rc); + } finally { + cluster.shutdown(); + } + } + + /** + * Wait till DataNode is transitioned to the expected state. + */ + protected void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) { + waitNodeState(Lists.newArrayList(node), state); + } + + /** + * Wait till all DataNodes are transitioned to the expected state. + */ + protected void waitNodeState(List nodes, DatanodeInfo.AdminStates state) { + for (DatanodeInfo node : nodes) { + boolean done = (state == node.getAdminState()); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(DFS_HEARTBEAT_INTERVAL_DEFAULT * 10); + } catch (InterruptedException e) { + // nothing + } + done = (state == node.getAdminState()); + } + LOG.info("node " + node + " reached the state " + state); + } + } + /** * Wait until Namenode reports expected storage type for all blocks of * given file.