Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6;
public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms";
public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L;
public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_KEY =
"dfs.namenode.full.block.report.yield.threshold.ms";
public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_DEFAULT = 100;
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ public long getTotalECBlockGroups() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

private final long blockReportYieldThresholdMs;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -617,6 +619,14 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

blockReportYieldThresholdMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_YIELD_THRESHOLD_MS_DEFAULT);

printBlockManagerInfo();
}

private void printBlockManagerInfo() {
LOG.info("defaultReplication = {}", defaultReplication);
LOG.info("maxReplication = {}", maxReplication);
LOG.info("minReplication = {}", minReplication);
Expand Down Expand Up @@ -2926,12 +2936,81 @@ Collection<Block> processReport(
Collection<Block> toInvalidate = new ArrayList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
Collection<StatefulBlockInfo> toUC = new ArrayList<>();
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
Collection<Block> allInvalidate = new ArrayList<>();

// place a delimiter in the list which separates blocks
// that have been reported from those that have not
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
Block delimiterBlock = new Block();
BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;

BlockListAsLongs newReport = report;
if (newReport == null) {
newReport = BlockListAsLongs.EMPTY;
}
// scan the report and process newly reported blocks
long beginTime = Time.monotonicNow();
for (BlockReportReplica iblk : newReport) {
// process diff and yield if iter time larger than blockReportYieldThresholdMs
if (Time.monotonicNow() - beginTime >= blockReportYieldThresholdMs) {
processDiff(storageInfo, toAdd,
toRemove, toInvalidate, toCorrupt, toUC, allInvalidate);
LOG.debug("processReport yield for storageId {}", storageInfo.getStorageID());
namesystem.writeUnlock(); // unlock for processQueue
namesystem.writeUnlock(); // unlock for processReport
namesystem.writeLock(); // lock for processReport
namesystem.writeLock(); // lock for processQueue
beginTime = Time.monotonicNow();
}
ReplicaState iState = iblk.getState();
LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
iblk.getNumBytes(), iState);
BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);

// move block to the head of the list
if (storedBlock != null) {
curIndex = storedBlock.findStorageInfo(storageInfo);
if (curIndex >= 0) {
headIndex =
storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
}
}
}

// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
while (it.hasNext()) {
toRemove.add(it.next());
}
storageInfo.removeBlock(delimiter);

processDiff(storageInfo, toAdd,
toRemove, toInvalidate, toCorrupt, toUC, allInvalidate);

return allInvalidate;
}

private void processDiff(
final DatanodeStorageInfo storageInfo,
Collection<BlockInfoToAdd> toAdd,
Collection<BlockInfo> toRemove,
Collection<Block> toInvalidate,
Collection<BlockToMarkCorrupt> toCorrupt,
Collection<StatefulBlockInfo> toUC,
Collection<Block> allInvalidate) throws IOException {

DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
for (BlockInfo b : toRemove) {
Expand All @@ -2954,7 +3033,12 @@ Collection<Block> processReport(
markBlockAsCorrupt(b, storageInfo, node);
}

return toInvalidate;
allInvalidate.addAll(toInvalidate);
toAdd.clear();
toRemove.clear();
toInvalidate.clear();
toCorrupt.clear();
toUC.clear();
}

/**
Expand Down Expand Up @@ -3077,57 +3161,6 @@ void processFirstBlockReport(
}
}

private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list

// place a delimiter in the list which separates blocks
// that have been reported from those that have not
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
Block delimiterBlock = new Block();
BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;

if (newReport == null) {
newReport = BlockListAsLongs.EMPTY;
}
// scan the report and process newly reported blocks
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
iblk.getNumBytes(), iState);
BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);

// move block to the head of the list
if (storedBlock != null) {
curIndex = storedBlock.findStorageInfo(storageInfo);
if (curIndex >= 0) {
headIndex =
storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
}
}
}

// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
while (it.hasNext()) {
toRemove.add(it.next());
}
storageInfo.removeBlock(delimiter);
}

/**
* Process a block replica reported by the data-node.
* No side effects except adding to the passed-in Collections.
Expand Down