From 2367054bec4e51b9356549e84ee376d88abcd1b5 Mon Sep 17 00:00:00 2001 From: liubingxing Date: Wed, 26 Jan 2022 19:46:42 +0800 Subject: [PATCH] HDFS-16432. Namenode block report add yield to avoid holding write lock too long --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../server/blockmanagement/BlockManager.java | 143 +++++++++++------- 2 files changed, 91 insertions(+), 55 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 14f9cd7730e2f..7d8a5f1987e3e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1041,6 +1041,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6; public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms"; public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L; + public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_KEY = + "dfs.namenode.full.block.report.yield.threshold.ms"; + public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_DEFAULT = 100; public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec"; public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000; public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d7045fc2a44cc..968034836543b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -474,6 +474,8 @@ public long getTotalECBlockGroups() { /** Storages accessible from multiple DNs. */ private final ProvidedStorageMap providedStorageMap; + private final long blockReportYieldThresholdMs; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -617,6 +619,14 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); + blockReportYieldThresholdMs = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_DEFAULT); + + printBlockManagerInfo(); + } + + private void printBlockManagerInfo() { LOG.info("defaultReplication = {}", defaultReplication); LOG.info("maxReplication = {}", maxReplication); LOG.info("minReplication = {}", minReplication); @@ -2926,12 +2936,81 @@ Collection processReport( Collection toInvalidate = new ArrayList<>(); Collection toCorrupt = new ArrayList<>(); Collection toUC = new ArrayList<>(); - reportDiff(storageInfo, report, - toAdd, toRemove, toInvalidate, toCorrupt, toUC); + Collection allInvalidate = new ArrayList<>(); + + // place a delimiter in the list which separates blocks + // that have been reported from those that have not + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + Block delimiterBlock = new Block(); + BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, + (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); + assert result == AddBlockResult.ADDED + : "Delimiting block cannot be present in the node"; + int headIndex = 0; //currently the delimiter is in the head of the list + int curIndex; + + BlockListAsLongs newReport = report; + if (newReport == null) { + newReport = BlockListAsLongs.EMPTY; + } + // scan the report and process newly reported blocks + long beginTime = Time.monotonicNow(); + for (BlockReportReplica iblk : newReport) { + // process diff and yield if iter time larger than blockReportYieldThresholdMs + if (Time.monotonicNow() - beginTime >= blockReportYieldThresholdMs) { + processDiff(storageInfo, toAdd, + toRemove, toInvalidate, toCorrupt, toUC, allInvalidate); + LOG.debug("processReport yield for storageId {}", storageInfo.getStorageID()); + namesystem.writeUnlock(); // unlock for processQueue + namesystem.writeUnlock(); // unlock for processReport + namesystem.writeLock(); // lock for processReport + namesystem.writeLock(); // lock for processQueue + beginTime = Time.monotonicNow(); + } + ReplicaState iState = iblk.getState(); + LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn, + iblk.getNumBytes(), iState); + BlockInfo storedBlock = processReportedBlock(storageInfo, + iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); + + // move block to the head of the list + if (storedBlock != null) { + curIndex = storedBlock.findStorageInfo(storageInfo); + if (curIndex >= 0) { + headIndex = + storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); + } + } + } + + // collect blocks that have not been reported + // all of them are next to the delimiter + Iterator it = + storageInfo.new BlockIterator(delimiter.getNext(0)); + while (it.hasNext()) { + toRemove.add(it.next()); + } + storageInfo.removeBlock(delimiter); + + processDiff(storageInfo, toAdd, + toRemove, toInvalidate, toCorrupt, toUC, allInvalidate); + + return allInvalidate; + } + + private void processDiff( + final DatanodeStorageInfo storageInfo, + Collection toAdd, + Collection toRemove, + Collection toInvalidate, + Collection toCorrupt, + Collection toUC, + Collection allInvalidate) throws IOException { DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue - for (StatefulBlockInfo b : toUC) { + for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } for (BlockInfo b : toRemove) { @@ -2954,7 +3033,12 @@ Collection processReport( markBlockAsCorrupt(b, storageInfo, node); } - return toInvalidate; + allInvalidate.addAll(toInvalidate); + toAdd.clear(); + toRemove.clear(); + toInvalidate.clear(); + toCorrupt.clear(); + toUC.clear(); } /** @@ -3077,57 +3161,6 @@ void processFirstBlockReport( } } - private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, - Collection toAdd, // add to DatanodeDescriptor - Collection toRemove, // remove from DatanodeDescriptor - Collection toInvalidate, // should be removed from DN - Collection toCorrupt, // add to corrupt replicas list - Collection toUC) { // add to under-construction list - - // place a delimiter in the list which separates blocks - // that have been reported from those that have not - DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); - Block delimiterBlock = new Block(); - BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, - (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); - assert result == AddBlockResult.ADDED - : "Delimiting block cannot be present in the node"; - int headIndex = 0; //currently the delimiter is in the head of the list - int curIndex; - - if (newReport == null) { - newReport = BlockListAsLongs.EMPTY; - } - // scan the report and process newly reported blocks - for (BlockReportReplica iblk : newReport) { - ReplicaState iState = iblk.getState(); - LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn, - iblk.getNumBytes(), iState); - BlockInfo storedBlock = processReportedBlock(storageInfo, - iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); - - // move block to the head of the list - if (storedBlock != null) { - curIndex = storedBlock.findStorageInfo(storageInfo); - if (curIndex >= 0) { - headIndex = - storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); - } - } - } - - // collect blocks that have not been reported - // all of them are next to the delimiter - Iterator it = - storageInfo.new BlockIterator(delimiter.getNext(0)); - while (it.hasNext()) { - toRemove.add(it.next()); - } - storageInfo.removeBlock(delimiter); - } - /** * Process a block replica reported by the data-node. * No side effects except adding to the passed-in Collections.