diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dd3193fdadff2..9c3492af6cd97 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1023,6 +1023,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock"; public static final int DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT = 1000; + public static final String DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE = + "dfs.namenode.decommission.ec.reconstruction.enable"; + public static final boolean DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT = false; public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count"; public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 41845152514fe..d7fe5b9d0e07a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -495,6 +495,7 @@ public int getPendingSPSPaths() { * Limits number of blocks used to check for excess redundancy timeout. */ private long excessRedundancyTimeoutCheckLimit; + private boolean decommissionECReconstruction = false; public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { @@ -608,7 +609,9 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, setExcessRedundancyTimeoutCheckLimit(conf.getLong( DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); - + this.decommissionECReconstruction = + conf.getBoolean(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE, + DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT); printInitialConfigs(); } @@ -2239,11 +2242,11 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); + List liveAndDecommissioningBusyBlockIndices = new ArrayList<>(); List liveBusyBlockIndices = new ArrayList<>(); - List excludeReconstructed = new ArrayList<>(); final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, containingNodes, liveReplicaNodes, numReplicas, - liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority); + liveBlockIndices, liveAndDecommissioningBusyBlockIndices, liveBusyBlockIndices, priority); short requiredRedundancy = getExpectedLiveRedundancyNum(block, numReplicas); if (srcNodes == null || srcNodes.length == 0) { @@ -2307,17 +2310,19 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, byte[] newIndices = new byte[liveBlockIndices.size()]; adjustSrcNodesAndIndices((BlockInfoStriped)block, srcNodes, liveBlockIndices, newSrcNodes, newIndices); - byte[] busyIndices = new byte[liveBusyBlockIndices.size()]; - for (int i = 0; i < liveBusyBlockIndices.size(); i++) { - busyIndices[i] = liveBusyBlockIndices.get(i); + byte[] liveAndDecommissioningBusyIndices = + new byte[liveAndDecommissioningBusyBlockIndices.size()]; + for (int i = 0; i < liveAndDecommissioningBusyBlockIndices.size(); i++) { + liveAndDecommissioningBusyIndices[i] = liveAndDecommissioningBusyBlockIndices.get(i); } - byte[] excludeReconstructedIndices = new byte[excludeReconstructed.size()]; - for (int i = 0; i < excludeReconstructed.size(); i++) { - excludeReconstructedIndices[i] = excludeReconstructed.get(i); + byte[] liveBusyIndices = new byte[liveBusyBlockIndices.size()]; + for (int i = 0; i < liveBusyBlockIndices.size(); i++) { + liveBusyIndices[i] = liveBusyBlockIndices.get(i); } return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority, newIndices, busyIndices, excludeReconstructedIndices); + priority, newIndices, liveAndDecommissioningBusyIndices, liveBusyIndices, + decommissionECReconstruction); } else { return new ReplicationWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, @@ -2549,7 +2554,11 @@ private DatanodeDescriptor getDatanodeDescriptorFromStorage( * replicas of the given block. * @param liveBlockIndices List to be populated with indices of healthy * blocks in a striped block group - * @param liveBusyBlockIndices List to be populated with indices of healthy + * @param liveAndDecommissioningBusyBlockIndices List to be populated with indices of live or + * decommissioning blocks in a striped block group + * in busy DN,which the recovery work have reached + * their replication limits + * @param liveBusyBlockIndices List to be populated with indices of live * blocks in a striped block group in busy DN, * which the recovery work have reached their * replication limits @@ -2563,7 +2572,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, List containingNodes, List nodesContainingLiveReplicas, NumberReplicas numReplicas, List liveBlockIndices, - List liveBusyBlockIndices, List excludeReconstructed, int priority) { + List liveAndDecommissioningBusyBlockIndices, + List liveBusyBlockIndices, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); List srcNodes = new ArrayList<>(); @@ -2631,22 +2641,26 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) && node.getNumberOfBlocksToBeReplicated() + node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) { - if (isStriped && (state == StoredReplicaState.LIVE - || state == StoredReplicaState.DECOMMISSIONING)) { - liveBusyBlockIndices.add(blockIndex); - //HDFS-16566 ExcludeReconstructed won't be reconstructed. - excludeReconstructed.add(blockIndex); + if (isStriped) { + if (state == StoredReplicaState.LIVE) { + liveAndDecommissioningBusyBlockIndices.add(blockIndex); + liveBusyBlockIndices.add(blockIndex); + } else if (state == StoredReplicaState.DECOMMISSIONING) { + liveAndDecommissioningBusyBlockIndices.add(blockIndex); + } } continue; // already reached replication limit } if (node.getNumberOfBlocksToBeReplicated() + node.getNumberOfBlocksToBeErasureCoded() >= replicationStreamsHardLimit) { - if (isStriped && (state == StoredReplicaState.LIVE - || state == StoredReplicaState.DECOMMISSIONING)) { - liveBusyBlockIndices.add(blockIndex); - //HDFS-16566 ExcludeReconstructed won't be reconstructed. - excludeReconstructed.add(blockIndex); + if (isStriped) { + if (state == StoredReplicaState.LIVE) { + liveAndDecommissioningBusyBlockIndices.add(blockIndex); + liveBusyBlockIndices.add(blockIndex); + } else if (state == StoredReplicaState.DECOMMISSIONING) { + liveAndDecommissioningBusyBlockIndices.add(blockIndex); + } } continue; } @@ -5831,4 +5845,12 @@ public void setMinBlocksForWrite(int minBlocksForWrite) { public int getMinBlocksForWrite(BlockType blockType) { return placementPolicies.getPolicy(blockType).getMinBlocksForWrite(); } + + public void setDecommissionECReconstruction(boolean decommissionECReconstruction) { + this.decommissionECReconstruction = decommissionECReconstruction; + } + + public boolean isDecommissionECReconstruction() { + return decommissionECReconstruction; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 5726fac0b3d79..a63a06bf65a06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -31,9 +31,10 @@ class ErasureCodingWork extends BlockReconstructionWork { private final byte[] liveBlockIndices; + private final byte[] liveAndDecommissioningBusyBlockIndices; private final byte[] liveBusyBlockIndices; - private final byte[] excludeReconstructedIndices; private final String blockPoolId; + private final boolean decommissionECReconstruction; public ErasureCodingWork(String blockPoolId, BlockInfo block, BlockCollection bc, @@ -41,14 +42,15 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block, List containingNodes, List liveReplicaStorages, int additionalReplRequired, int priority, - byte[] liveBlockIndices, byte[] liveBusyBlockIndices, - byte[] excludeReconstrutedIndices) { + byte[] liveBlockIndices, byte[] liveAndDecommissioningBusyBlockIndices, + byte[] liveBusyBlockIndices, boolean decommissionECReconstruction) { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); this.blockPoolId = blockPoolId; this.liveBlockIndices = liveBlockIndices; + this.liveAndDecommissioningBusyBlockIndices = liveAndDecommissioningBusyBlockIndices; this.liveBusyBlockIndices = liveBusyBlockIndices; - this.excludeReconstructedIndices = excludeReconstrutedIndices; + this.decommissionECReconstruction = decommissionECReconstruction; LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", block); } @@ -84,14 +86,14 @@ void chooseTargets(BlockPlacementPolicy blockplacement, private boolean hasAllInternalBlocks() { final BlockInfoStriped block = (BlockInfoStriped) getBlock(); if (liveBlockIndices.length - + liveBusyBlockIndices.length < block.getRealTotalBlockNum()) { + + liveAndDecommissioningBusyBlockIndices.length < block.getRealTotalBlockNum()) { return false; } BitSet bitSet = new BitSet(block.getTotalBlockNum()); for (byte index : liveBlockIndices) { bitSet.set(index); } - for (byte busyIndex: liveBusyBlockIndices) { + for (byte busyIndex: liveAndDecommissioningBusyBlockIndices) { bitSet.set(busyIndex); } for (int i = 0; i < block.getRealDataBlockNum(); i++) { @@ -150,10 +152,19 @@ boolean addTaskToDatanode(NumberReplicas numberReplicas) { numberReplicas.liveEnteringMaintenanceReplicas() > 0) && hasAllInternalBlocks()) { List leavingServiceSources = findLeavingServiceSources(); - // decommissioningSources.size() should be >= targets.length final int num = Math.min(leavingServiceSources.size(), targets.length); if (num == 0) { - flag = false; + if (decommissionECReconstruction && targets.length > 0 && + getSrcNodes().length >= ((BlockInfoStriped) getBlock()).getDataBlockNum()) { + // Here we use liveBusyBlockIndices as excludeReconstrutedIndices which only include LIVE. + // If ec reconstruction is enabled when decommissioning, we will reconstruct + // DECOMMISSIONING index. + targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets, + liveBlockIndices, liveBusyBlockIndices, stripedBlk.getErasureCodingPolicy()); + } else { + flag = false; + } } for (int i = 0; i < num; i++) { createReplicationWork(leavingServiceSources.get(i), targets[i]); @@ -161,7 +172,8 @@ boolean addTaskToDatanode(NumberReplicas numberReplicas) { } else { targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets, - liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy()); + liveBlockIndices, liveAndDecommissioningBusyBlockIndices, + stripedBlk.getErasureCodingPolicy()); } return flag; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 832a8029f7771..7209c4a6da170 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -351,6 +351,10 @@ public void incrBlocksReplicated() { blocksReplicated.incr(); } + public long getBlocksReplicated() { + return blocksReplicated.value(); + } + public void incrBlocksWritten() { blocksWritten.incr(); } @@ -584,6 +588,10 @@ public void incrECReconstructionTasks() { ecReconstructionTasks.incr(); } + public long getECReconstructionTasks() { + return ecReconstructionTasks.value(); + } + public void incrECFailedReconstructionTasks() { ecFailedReconstructionTasks.incr(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index bcf56a86441d3..56f9ba8acc7a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -139,6 +139,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; @@ -383,7 +385,8 @@ public enum OperationCategory { DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, - DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY)); + DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, + DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2356,7 +2359,8 @@ protected String reconfigurePropertyImpl(String property, String newVal) || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY) || property.equals( DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION) - || property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) { + || property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY) + || property.equals(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE)) { return reconfReplicationParameters(newVal, property); } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) { @@ -2395,19 +2399,19 @@ protected String reconfigurePropertyImpl(String property, String newVal) private String reconfReplicationParameters(final String newVal, final String property) throws ReconfigurationException { BlockManager bm = namesystem.getBlockManager(); - int newSetting; + String newSetting; namesystem.writeLock(); try { if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) { bm.setMaxReplicationStreams( adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal)); - newSetting = bm.getMaxReplicationStreams(); + newSetting = Integer.toString(bm.getMaxReplicationStreams()); } else if (property.equals( DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) { bm.setReplicationStreamsHardLimit( adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT, newVal)); - newSetting = bm.getReplicationStreamsHardLimit(); + newSetting = Integer.toString(bm.getReplicationStreamsHardLimit()); } else if ( property.equals( DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) { @@ -2415,7 +2419,7 @@ private String reconfReplicationParameters(final String newVal, adjustNewVal( DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT, newVal)); - newSetting = bm.getBlocksReplWorkMultiplier(); + newSetting = Integer.toString(bm.getBlocksReplWorkMultiplier()); } else if ( property.equals( DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) { @@ -2423,13 +2427,19 @@ private String reconfReplicationParameters(final String newVal, adjustNewVal( DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT, newVal)); - newSetting = bm.getReconstructionPendingTimeout(); + newSetting = Integer.toString(bm.getReconstructionPendingTimeout()); + } else if (property.equals(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE)) { + boolean decommissionECReconstruction = + (newVal == null) ? DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT : + Boolean.parseBoolean(newVal); + bm.setDecommissionECReconstruction(decommissionECReconstruction); + newSetting = Boolean.toString(bm.isDecommissionECReconstruction()); } else { throw new IllegalArgumentException("Unexpected property " + property + " in reconfReplicationParameters"); } LOG.info("RECONFIGURE* changed {} to {}", property, newSetting); - return String.valueOf(newSetting); + return newSetting; } catch (IllegalArgumentException e) { throw new ReconfigurationException(property, newVal, getConf().get( property), e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1295c0dca8752..b6108e5828c4d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1233,6 +1233,14 @@ + + dfs.namenode.decommission.ec.reconstruction.enable + false + + Whether to use reconstruction to copy ec block when the related node is busy. + + + dfs.namenode.redundancy.interval.seconds 3 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index 83332cc3134b4..3bdccf201d1e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -1157,4 +1157,169 @@ public void testRecoveryWithDecommission() throws Exception { originBytesArray, readBytesArray, ecPolicy); cleanupFile(dfs, ecFile); } + + @Test(timeout = 120000) + public void testDecommissionBusyNodeWithECReconstruction1() throws Exception { + bm.setDecommissionECReconstruction(false); + byte[] indices = new byte[]{6}; + // 1 create EC file + final Path ecFile = new Path(ecDir, "testDecommission2NodeWithBusyNode"); + int writeBytes = cellSize * dataBlocks; + writeStripedFile(dfs, ecFile, writeBytes); + + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + // 2 make datanode busy + INodeFile fileNode = + cluster.getNamesystem().getFSDirectory().getINode4Write(ecFile.toString()).asFile(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + List storageInfos = + getStorageInfoForBlockIndex((BlockInfoStriped) firstBlock, indices); + assertEquals(1, storageInfos.size()); + DatanodeDescriptor busyNode = storageInfos.get(0).getDatanodeDescriptor(); + for (int j = 0; j < replicationStreamsHardLimit; j++) { + busyNode.incrementPendingReplicationWithoutTargets(); + } + DatanodeStorageInfo toDecommissionStorage = storageInfos.get(0); + + // 3 decommissioning one datanode + List decommissionNodes = new ArrayList<>(); + decommissionNodes.add(storageInfos.get(0).getDatanodeDescriptor()); + decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSION_INPROGRESS); + + // 4 verify the replica is not copied or reconstructed + Thread.sleep(3000); + assertEquals(8, bm.countNodes(firstBlock).liveReplicas()); + assertEquals(1, bm.countNodes(firstBlock).decommissioning()); + assertEquals(0, cluster.getDataNodes().stream() + .mapToLong(dn -> dn.getMetrics().getECReconstructionTasks()).sum()); + + // 5 verify the replica is reconstructed + bm.setDecommissionECReconstruction(true); + decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSIONED); + assertEquals(9, bm.countNodes(firstBlock).liveReplicas()); + assertEquals(1, bm.countNodes(firstBlock).decommissioned()); + assertEquals(0, cluster.getDataNodes().stream() + .mapToLong(dn -> dn.getMetrics().getBlocksReplicated()).sum()); + assertTrue(cluster.getDataNodes().stream() + .mapToLong(dn -> dn.getMetrics().getECReconstructionTasks()).sum() > 0); + + // 6 Get getBlocks again, verity that the decommissioned index already has a LIVE replica, + // and confirm verify the DECOMMISSIONED replica is the previously offline replica + fileNode = cluster.getNamesystem().getFSDirectory().getINode(ecFile.toString()).asFile(); + firstBlock = fileNode.getBlocks()[0]; + List newStorageInfos = + getStorageInfoForBlockIndex((BlockInfoStriped) firstBlock, indices); + assertTrue(newStorageInfos.size() >= 2); + DatanodeStorageInfo decommissionedNode = null; + int alive = 0; + for (int i = 0; i < newStorageInfos.size(); i++) { + DatanodeStorageInfo datanodeStorageInfo = newStorageInfos.get(i); + if (datanodeStorageInfo.getDatanodeDescriptor().isDecommissioned()) { + decommissionedNode = datanodeStorageInfo; + } else if (datanodeStorageInfo.getDatanodeDescriptor().isAlive()) { + alive++; + } + } + assertNotNull(decommissionedNode); + assertEquals(toDecommissionStorage, decommissionedNode); + assertTrue(alive >= 1); + + // 7 check the checksum of a file + FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes); + Assert.assertEquals("Checksum mismatches!", fileChecksum1, fileChecksum2); + + // 8 check the data is correct + StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommissionNodes, + null, blockGroupSize); + } + + @Test(timeout = 120000) + public void testDecommissionBusyNodeWithECReconstruction2() throws Exception { + bm.setDecommissionECReconstruction(false); + byte[] indices = new byte[]{5, 6}; + // 1 create EC file + final Path ecFile = new Path(ecDir, "testDecommission2NodeWithBusyNode"); + int writeBytes = cellSize * dataBlocks; + writeStripedFile(dfs, ecFile, writeBytes); + + assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + // 2 make one datanode busy + INodeFile fileNode = + cluster.getNamesystem().getFSDirectory().getINode4Write(ecFile.toString()).asFile(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + List storageInfos = + getStorageInfoForBlockIndex((BlockInfoStriped) firstBlock, indices); + assertEquals(2, storageInfos.size()); + DatanodeDescriptor busyNode = storageInfos.get(0).getDatanodeDescriptor(); + for (int j = 0; j < replicationStreamsHardLimit; j++) { + busyNode.incrementPendingReplicationWithoutTargets(); + } + + // 3 decommissioning two datanode + List decommissionNodes = new ArrayList<>(); + decommissionNodes.add(storageInfos.get(0).getDatanodeDescriptor()); + decommissionNodes.add(storageInfos.get(1).getDatanodeDescriptor()); + decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSION_INPROGRESS); + + // 4 Verify that the non-busy replica has been copied and the busy replica is + // reconstructed after decommissionECReconstruction is enabled. + bm.setDecommissionECReconstruction(true); + decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSIONED); + assertEquals(9, bm.countNodes(firstBlock).liveReplicas()); + assertEquals(2, bm.countNodes(firstBlock).decommissioned()); + assertTrue(cluster.getDataNodes().stream() + .mapToLong(dn -> dn.getMetrics().getBlocksReplicated()).sum() > 0); + assertTrue(cluster.getDataNodes().stream() + .mapToLong(dn -> dn.getMetrics().getECReconstructionTasks()).sum() > 0); + + // 5 Get getBlocks again, verity that the decommissioned index already has a LIVE replica, + // and confirm verify the DECOMMISSIONED replica is the previously offline replica + fileNode = cluster.getNamesystem().getFSDirectory().getINode(ecFile.toString()).asFile(); + firstBlock = fileNode.getBlocks()[0]; + List newStorageInfos = + getStorageInfoForBlockIndex((BlockInfoStriped) firstBlock, indices); + assertTrue(newStorageInfos.size() >= 4); + int alive = 0; + int decommissioned = 0; + for (int i = 0; i < newStorageInfos.size(); i++) { + DatanodeStorageInfo newDatanodeStorageInfo = newStorageInfos.get(i); + if (newDatanodeStorageInfo.getDatanodeDescriptor().isDecommissioned()) { + assertTrue(newDatanodeStorageInfo.equals(storageInfos.get(0)) || + newDatanodeStorageInfo.equals(storageInfos.get(1))); + decommissioned++; + } else if (newDatanodeStorageInfo.getDatanodeDescriptor().isAlive()) { + alive++; + } + } + assertTrue(alive >= 2); + assertEquals(2, decommissioned); + + // 6 check the checksum of a file + FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes); + assertEquals("Checksum mismatches!", fileChecksum1, fileChecksum2); + + // 7 check the data is correct + StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommissionNodes, + null, blockGroupSize); + } + + private List getStorageInfoForBlockIndex(BlockInfoStriped block, + byte[] blockIndices) { + List storageInfos = new ArrayList<>(); + Iterator iterator = + block.getStorageAndIndexInfos().iterator(); + while (iterator.hasNext()) { + BlockInfoStriped.StorageAndBlockIndex storageAndBlockIndex = iterator.next(); + for (int blockIndex : blockIndices) { + if (storageAndBlockIndex.getBlockIndex() == blockIndex) { + storageInfos.add(storageAndBlockIndex.getStorage()); + } + } + } + return storageInfos; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 392866e86ea2e..2b66c6d07d5c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -39,6 +39,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; @@ -444,7 +445,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(29, outs.size()); + assertEquals(30, outs.size()); assertTrue(outs.get(0).contains("Reconfigurable properties:")); assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1)); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2)); @@ -458,8 +459,9 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY, outs.get(10)); assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(11)); assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(12)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(13)); - assertEquals(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, outs.get(14)); + assertEquals(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE, outs.get(13)); + assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(14)); + assertEquals(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, outs.get(15)); assertEquals(errs.size(), 0); }