Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
Expand Down Expand Up @@ -191,6 +192,9 @@ public class BlockManager implements BlockStatsMXBean {
private volatile long lowRedundancyBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;

private final long deleteBlockLockTimeMs = 500;
private final long deleteBlockUnlockIntervalTimeMs = 100;

/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues;

Expand Down Expand Up @@ -324,6 +328,12 @@ public long getTotalECBlockGroups() {
* {@link #redundancyThread} has run at least one full iteration.
*/
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
/**
* markedDeleteBlockScrubber thread for handling async delete blocks.
*/
private final Daemon markedDeleteBlockScrubberThread =
new Daemon(new MarkedDeleteBlockScrubber());

/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread;

Expand Down Expand Up @@ -422,6 +432,12 @@ public long getTotalECBlockGroups() {
*/
private int numBlocksPerIteration;

/**
* The blocks of deleted files are put into the queue,
* and the cleanup thread processes these blocks periodically.
*/
private final ConcurrentLinkedQueue<List<BlockInfo>> markedDeleteQueue;

/**
* Progress of the Reconstruction queues initialisation.
*/
Expand Down Expand Up @@ -475,7 +491,7 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
datanodeManager.getBlockInvalidateLimit(),
startupDelayBlockDeletionInMs,
blockIdManager);

markedDeleteQueue = new ConcurrentLinkedQueue<>();
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
Expand Down Expand Up @@ -725,6 +741,9 @@ public void activate(Configuration conf, long blockTotal) {
datanodeManager.activate(conf);
this.redundancyThread.setName("RedundancyMonitor");
this.redundancyThread.start();
this.markedDeleteBlockScrubberThread.
setName("MarkedDeleteBlockScrubberThread");
this.markedDeleteBlockScrubberThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
Expand All @@ -738,8 +757,10 @@ public void close() {
try {
redundancyThread.interrupt();
blockReportThread.interrupt();
markedDeleteBlockScrubberThread.interrupt();
redundancyThread.join(3000);
blockReportThread.join(3000);
markedDeleteBlockScrubberThread.join(3000);
} catch (InterruptedException ie) {
}
datanodeManager.close();
Expand Down Expand Up @@ -4911,6 +4932,77 @@ public long getLastRedundancyMonitorTS() {
return lastRedundancyCycleTS.get();
}

/**
* Periodically deletes the marked block.
*/
private class MarkedDeleteBlockScrubber implements Runnable {
private Iterator<BlockInfo> toDeleteIterator = null;
private boolean isSleep;
private NameNodeMetrics metrics;

private void remove(long time) {
if (checkToDeleteIterator()) {
namesystem.writeLock();
try {
while (toDeleteIterator.hasNext()) {
removeBlock(toDeleteIterator.next());
metrics.decrPendingDeleteBlocksCount();
if (Time.monotonicNow() - time > deleteBlockLockTimeMs) {
isSleep = true;
break;
}
}
} finally {
namesystem.writeUnlock();
}
}
}

private boolean checkToDeleteIterator() {
return toDeleteIterator != null && toDeleteIterator.hasNext();
}

@Override
public void run() {
LOG.info("Start MarkedDeleteBlockScrubber thread");
while (namesystem.isRunning() &&
!Thread.currentThread().isInterrupted()) {
if (!markedDeleteQueue.isEmpty() || checkToDeleteIterator()) {
try {
metrics = NameNode.getNameNodeMetrics();
metrics.setDeleteBlocksQueued(markedDeleteQueue.size());
isSleep = false;
long startTime = Time.monotonicNow();
remove(startTime);
while (!isSleep && !markedDeleteQueue.isEmpty() &&
!Thread.currentThread().isInterrupted()) {
List<BlockInfo> markedDeleteList = markedDeleteQueue.poll();
if (markedDeleteList != null) {
toDeleteIterator = markedDeleteList.listIterator();
}
remove(startTime);
}
} catch (Exception e){
LOG.warn("MarkedDeleteBlockScrubber encountered an exception" +
" during the block deletion process, " +
" the deletion of the block will retry in {} millisecond.",
deleteBlockUnlockIntervalTimeMs, e);
}
}
if (isSleep) {
LOG.debug("Clear markedDeleteQueue over {}" +
" millisecond to release the write lock", deleteBlockLockTimeMs);
}
try {
Thread.sleep(deleteBlockUnlockIntervalTimeMs);
} catch (InterruptedException e) {
LOG.info("Stopping MarkedDeleteBlockScrubber.");
break;
}
}
}
}

/**
* Periodically calls computeBlockRecoveryWork().
*/
Expand Down Expand Up @@ -5259,6 +5351,17 @@ public BlockIdManager getBlockIdManager() {
return blockIdManager;
}

@VisibleForTesting
public ConcurrentLinkedQueue<List<BlockInfo>> getMarkedDeleteQueue() {
return markedDeleteQueue;
}

public void addBLocksToMarkedDeleteQueue(List<BlockInfo> blockInfos) {
markedDeleteQueue.add(blockInfos);
NameNode.getNameNodeMetrics().
incrPendingDeleteBlocksCount(blockInfos.size());
}

public long nextGenerationStamp(boolean legacyBlock) throws IOException {
return blockIdManager.nextGenerationStamp(legacyBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2373,8 +2373,8 @@ boolean truncate(String src, long newLength, String clientName,
}
getEditLog().logSync();
if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
blockManager.addBLocksToMarkedDeleteQueue(
toRemoveBlocks.getToDeleteList());
}
logAuditEvent(true, operationName, src, null, status);
} catch (AccessControlException e) {
Expand Down Expand Up @@ -2821,8 +2821,8 @@ private HdfsFileStatus startFileInt(String src,
if (!skipSync) {
getEditLog().logSync();
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
blockManager.addBLocksToMarkedDeleteQueue(
toRemoveBlocks.getToDeleteList());
}
}
}
Expand Down Expand Up @@ -3345,8 +3345,8 @@ void renameTo(final String src, final String dst,
assert res != null;
BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks;
if (!collectedBlocks.getToDeleteList().isEmpty()) {
removeBlocks(collectedBlocks);
collectedBlocks.clear();
blockManager.addBLocksToMarkedDeleteQueue(
collectedBlocks.getToDeleteList());
}

logAuditEvent(true, operationName + " (options=" +
Expand Down Expand Up @@ -3385,7 +3385,8 @@ boolean delete(String src, boolean recursive, boolean logRetryCache)
getEditLog().logSync();
logAuditEvent(ret, operationName, src);
if (toRemovedBlocks != null) {
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
blockManager.addBLocksToMarkedDeleteQueue(
toRemovedBlocks.getToDeleteList());
}
return ret;
}
Expand All @@ -3395,30 +3396,6 @@ FSPermissionChecker getPermissionChecker()
return dir.getPermissionChecker();
}

/**
* From the given list, incrementally remove the blocks from blockManager
* Writelock is dropped and reacquired every blockDeletionIncrement to
* ensure that other waiters on the lock can get in. See HDFS-2938
*
* @param blocks
* An instance of {@link BlocksMapUpdateInfo} which contains a list
* of blocks that need to be removed from blocksMap
*/
void removeBlocks(BlocksMapUpdateInfo blocks) {
List<BlockInfo> toDeleteList = blocks.getToDeleteList();
Iterator<BlockInfo> iter = toDeleteList.iterator();
while (iter.hasNext()) {
writeLock();
try {
for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
blockManager.removeBlock(iter.next());
}
} finally {
writeUnlock("removeBlocks");
}
}
}

/**
* Remove leases and inodes related to a given path
* @param removedUCFiles INodes whose leases need to be released
Expand Down Expand Up @@ -4627,7 +4604,8 @@ private void clearCorruptLazyPersistFiles()
INodesInPath.fromINode((INodeFile) bc), false);
changed |= toRemoveBlocks != null;
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
blockManager.addBLocksToMarkedDeleteQueue(
toRemoveBlocks.getToDeleteList());
}
}
} finally {
Expand Down Expand Up @@ -7338,7 +7316,8 @@ void deleteSnapshot(String snapshotRoot, String snapshotName,
// Breaking the pattern as removing blocks have to happen outside of the
// global lock
if (blocksToBeDeleted != null) {
removeBlocks(blocksToBeDeleted);
blockManager.addBLocksToMarkedDeleteQueue(
blocksToBeDeleted.getToDeleteList());
}
logAuditEvent(true, operationName, rootPath, null, null);
}
Expand All @@ -7364,7 +7343,8 @@ public void gcDeletedSnapshot(String snapshotRoot, String snapshotName)
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(rootPath));
}
removeBlocks(blocksToBeDeleted);
blockManager.addBLocksToMarkedDeleteQueue(
blocksToBeDeleted.getToDeleteList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public class NameNodeMetrics {
MutableCounterLong blockOpsBatched;
@Metric("Number of pending edits")
MutableGaugeInt pendingEditsCount;
@Metric("Number of delete blocks Queued")
MutableGaugeInt deleteBlocksQueued;
@Metric("Number of pending deletion blocks")
MutableGaugeInt pendingDeleteBlocksCount;

@Metric("Number of file system operations")
public long totalFileOps(){
Expand Down Expand Up @@ -341,6 +345,18 @@ public void setBlockOpsQueued(int size) {
blockOpsQueued.set(size);
}

public void setDeleteBlocksQueued(int size) {
deleteBlocksQueued.set(size);
}

public void incrPendingDeleteBlocksCount(int size) {
pendingDeleteBlocksCount.incr(size);
}

public void decrPendingDeleteBlocksCount() {
pendingDeleteBlocksCount.decr();
}

public void addBlockOpsBatched(int count) {
blockOpsBatched.incr(count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ public void testScheduledBlocksCounterDecrementOnDeletedBlock()

// 4. delete the file
dfs.delete(filePath, true);
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
cluster.getNamesystem(0).getBlockManager());
int blocksScheduled = 0;
for (DatanodeDescriptor descriptor : dnList) {
if (descriptor.getBlocksScheduled() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -161,6 +162,8 @@ public void testRenameWithOverwrite() throws Exception {
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
getLocalBlock()) != null);
dfs.rename(srcPath, dstPath, Rename.OVERWRITE);
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
cluster.getNamesystem(0).getBlockManager());
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
getLocalBlock()) == null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
Expand Down Expand Up @@ -1356,13 +1357,17 @@ public void testFileCreationWithOverwrite() throws Exception {
assertBlocks(bm, oldBlocks, true);

out = dfs.create(filePath, true);
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
cluster.getNamesystem(0).getBlockManager());
byte[] newData = AppendTestUtil.randomBytes(seed, fileSize);
try {
out.write(newData);
} finally {
out.close();
}
dfs.deleteOnExit(filePath);
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
cluster.getNamesystem(0).getBlockManager());

LocatedBlocks newBlocks = NameNodeAdapter.getBlockLocations(
nn, file, 0, fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testReportBadBlock() throws IOException {
}

@Test
public void testInvalidateBlock() throws IOException {
public void testInvalidateBlock() throws IOException, InterruptedException {
final Path file = new Path("/invalidate");
final int length = 10;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
Expand All @@ -151,6 +152,8 @@ public void testInvalidateBlock() throws IOException {
try {
// delete the file
dfs.delete(file, true);
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
cluster.getNamesystem().getBlockManager());
// check the block is added to invalidateBlocks
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
Expand Down
Loading