Skip to content

Commit c53a4bb

Browse files
zhuxiangyiHarshitGupta11
authored andcommitted
HDFS-16043. Add markedDeleteBlockScrubberThread to delete blocks asynchronously (apache#3063). Contributed by Xiangyi Zhu.
Reviewed-by: tomscut <[email protected]> Signed-off-by: Wei-Chiu Chuang <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]>
1 parent 2f16887 commit c53a4bb

File tree

18 files changed

+244
-71
lines changed

18 files changed

+244
-71
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.FutureTask;
4848
import java.util.concurrent.ThreadLocalRandom;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.ConcurrentLinkedQueue;
5051

5152
import java.util.concurrent.atomic.AtomicLong;
5253
import javax.management.ObjectName;
@@ -191,6 +192,9 @@ public class BlockManager implements BlockStatsMXBean {
191192
private volatile long lowRedundancyBlocksCount = 0L;
192193
private volatile long scheduledReplicationBlocksCount = 0L;
193194

195+
private final long deleteBlockLockTimeMs = 500;
196+
private final long deleteBlockUnlockIntervalTimeMs = 100;
197+
194198
/** flag indicating whether replication queues have been initialized */
195199
private boolean initializedReplQueues;
196200

@@ -324,6 +328,12 @@ public long getTotalECBlockGroups() {
324328
* {@link #redundancyThread} has run at least one full iteration.
325329
*/
326330
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
331+
/**
332+
* markedDeleteBlockScrubber thread for handling async delete blocks.
333+
*/
334+
private final Daemon markedDeleteBlockScrubberThread =
335+
new Daemon(new MarkedDeleteBlockScrubber());
336+
327337
/** Block report thread for handling async reports. */
328338
private final BlockReportProcessingThread blockReportThread;
329339

@@ -422,6 +432,12 @@ public long getTotalECBlockGroups() {
422432
*/
423433
private int numBlocksPerIteration;
424434

435+
/**
436+
* The blocks of deleted files are put into the queue,
437+
* and the cleanup thread processes these blocks periodically.
438+
*/
439+
private final ConcurrentLinkedQueue<List<BlockInfo>> markedDeleteQueue;
440+
425441
/**
426442
* Progress of the Reconstruction queues initialisation.
427443
*/
@@ -475,7 +491,7 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
475491
datanodeManager.getBlockInvalidateLimit(),
476492
startupDelayBlockDeletionInMs,
477493
blockIdManager);
478-
494+
markedDeleteQueue = new ConcurrentLinkedQueue<>();
479495
// Compute the map capacity by allocating 2% of total memory
480496
blocksMap = new BlocksMap(
481497
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
@@ -725,6 +741,9 @@ public void activate(Configuration conf, long blockTotal) {
725741
datanodeManager.activate(conf);
726742
this.redundancyThread.setName("RedundancyMonitor");
727743
this.redundancyThread.start();
744+
this.markedDeleteBlockScrubberThread.
745+
setName("MarkedDeleteBlockScrubberThread");
746+
this.markedDeleteBlockScrubberThread.start();
728747
this.blockReportThread.start();
729748
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
730749
bmSafeMode.activate(blockTotal);
@@ -738,8 +757,10 @@ public void close() {
738757
try {
739758
redundancyThread.interrupt();
740759
blockReportThread.interrupt();
760+
markedDeleteBlockScrubberThread.interrupt();
741761
redundancyThread.join(3000);
742762
blockReportThread.join(3000);
763+
markedDeleteBlockScrubberThread.join(3000);
743764
} catch (InterruptedException ie) {
744765
}
745766
datanodeManager.close();
@@ -4911,6 +4932,77 @@ public long getLastRedundancyMonitorTS() {
49114932
return lastRedundancyCycleTS.get();
49124933
}
49134934

4935+
/**
4936+
* Periodically deletes the marked block.
4937+
*/
4938+
private class MarkedDeleteBlockScrubber implements Runnable {
4939+
private Iterator<BlockInfo> toDeleteIterator = null;
4940+
private boolean isSleep;
4941+
private NameNodeMetrics metrics;
4942+
4943+
private void remove(long time) {
4944+
if (checkToDeleteIterator()) {
4945+
namesystem.writeLock();
4946+
try {
4947+
while (toDeleteIterator.hasNext()) {
4948+
removeBlock(toDeleteIterator.next());
4949+
metrics.decrPendingDeleteBlocksCount();
4950+
if (Time.monotonicNow() - time > deleteBlockLockTimeMs) {
4951+
isSleep = true;
4952+
break;
4953+
}
4954+
}
4955+
} finally {
4956+
namesystem.writeUnlock();
4957+
}
4958+
}
4959+
}
4960+
4961+
private boolean checkToDeleteIterator() {
4962+
return toDeleteIterator != null && toDeleteIterator.hasNext();
4963+
}
4964+
4965+
@Override
4966+
public void run() {
4967+
LOG.info("Start MarkedDeleteBlockScrubber thread");
4968+
while (namesystem.isRunning() &&
4969+
!Thread.currentThread().isInterrupted()) {
4970+
if (!markedDeleteQueue.isEmpty() || checkToDeleteIterator()) {
4971+
try {
4972+
metrics = NameNode.getNameNodeMetrics();
4973+
metrics.setDeleteBlocksQueued(markedDeleteQueue.size());
4974+
isSleep = false;
4975+
long startTime = Time.monotonicNow();
4976+
remove(startTime);
4977+
while (!isSleep && !markedDeleteQueue.isEmpty() &&
4978+
!Thread.currentThread().isInterrupted()) {
4979+
List<BlockInfo> markedDeleteList = markedDeleteQueue.poll();
4980+
if (markedDeleteList != null) {
4981+
toDeleteIterator = markedDeleteList.listIterator();
4982+
}
4983+
remove(startTime);
4984+
}
4985+
} catch (Exception e){
4986+
LOG.warn("MarkedDeleteBlockScrubber encountered an exception" +
4987+
" during the block deletion process, " +
4988+
" the deletion of the block will retry in {} millisecond.",
4989+
deleteBlockUnlockIntervalTimeMs, e);
4990+
}
4991+
}
4992+
if (isSleep) {
4993+
LOG.debug("Clear markedDeleteQueue over {}" +
4994+
" millisecond to release the write lock", deleteBlockLockTimeMs);
4995+
}
4996+
try {
4997+
Thread.sleep(deleteBlockUnlockIntervalTimeMs);
4998+
} catch (InterruptedException e) {
4999+
LOG.info("Stopping MarkedDeleteBlockScrubber.");
5000+
break;
5001+
}
5002+
}
5003+
}
5004+
}
5005+
49145006
/**
49155007
* Periodically calls computeBlockRecoveryWork().
49165008
*/
@@ -5259,6 +5351,17 @@ public BlockIdManager getBlockIdManager() {
52595351
return blockIdManager;
52605352
}
52615353

5354+
@VisibleForTesting
5355+
public ConcurrentLinkedQueue<List<BlockInfo>> getMarkedDeleteQueue() {
5356+
return markedDeleteQueue;
5357+
}
5358+
5359+
public void addBLocksToMarkedDeleteQueue(List<BlockInfo> blockInfos) {
5360+
markedDeleteQueue.add(blockInfos);
5361+
NameNode.getNameNodeMetrics().
5362+
incrPendingDeleteBlocksCount(blockInfos.size());
5363+
}
5364+
52625365
public long nextGenerationStamp(boolean legacyBlock) throws IOException {
52635366
return blockIdManager.nextGenerationStamp(legacyBlock);
52645367
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2373,8 +2373,8 @@ boolean truncate(String src, long newLength, String clientName,
23732373
}
23742374
getEditLog().logSync();
23752375
if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
2376-
removeBlocks(toRemoveBlocks);
2377-
toRemoveBlocks.clear();
2376+
blockManager.addBLocksToMarkedDeleteQueue(
2377+
toRemoveBlocks.getToDeleteList());
23782378
}
23792379
logAuditEvent(true, operationName, src, null, status);
23802380
} catch (AccessControlException e) {
@@ -2821,8 +2821,8 @@ private HdfsFileStatus startFileInt(String src,
28212821
if (!skipSync) {
28222822
getEditLog().logSync();
28232823
if (toRemoveBlocks != null) {
2824-
removeBlocks(toRemoveBlocks);
2825-
toRemoveBlocks.clear();
2824+
blockManager.addBLocksToMarkedDeleteQueue(
2825+
toRemoveBlocks.getToDeleteList());
28262826
}
28272827
}
28282828
}
@@ -3345,8 +3345,8 @@ void renameTo(final String src, final String dst,
33453345
assert res != null;
33463346
BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks;
33473347
if (!collectedBlocks.getToDeleteList().isEmpty()) {
3348-
removeBlocks(collectedBlocks);
3349-
collectedBlocks.clear();
3348+
blockManager.addBLocksToMarkedDeleteQueue(
3349+
collectedBlocks.getToDeleteList());
33503350
}
33513351

33523352
logAuditEvent(true, operationName + " (options=" +
@@ -3385,7 +3385,8 @@ boolean delete(String src, boolean recursive, boolean logRetryCache)
33853385
getEditLog().logSync();
33863386
logAuditEvent(ret, operationName, src);
33873387
if (toRemovedBlocks != null) {
3388-
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
3388+
blockManager.addBLocksToMarkedDeleteQueue(
3389+
toRemovedBlocks.getToDeleteList());
33893390
}
33903391
return ret;
33913392
}
@@ -3395,30 +3396,6 @@ FSPermissionChecker getPermissionChecker()
33953396
return dir.getPermissionChecker();
33963397
}
33973398

3398-
/**
3399-
* From the given list, incrementally remove the blocks from blockManager
3400-
* Writelock is dropped and reacquired every blockDeletionIncrement to
3401-
* ensure that other waiters on the lock can get in. See HDFS-2938
3402-
*
3403-
* @param blocks
3404-
* An instance of {@link BlocksMapUpdateInfo} which contains a list
3405-
* of blocks that need to be removed from blocksMap
3406-
*/
3407-
void removeBlocks(BlocksMapUpdateInfo blocks) {
3408-
List<BlockInfo> toDeleteList = blocks.getToDeleteList();
3409-
Iterator<BlockInfo> iter = toDeleteList.iterator();
3410-
while (iter.hasNext()) {
3411-
writeLock();
3412-
try {
3413-
for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
3414-
blockManager.removeBlock(iter.next());
3415-
}
3416-
} finally {
3417-
writeUnlock("removeBlocks");
3418-
}
3419-
}
3420-
}
3421-
34223399
/**
34233400
* Remove leases and inodes related to a given path
34243401
* @param removedUCFiles INodes whose leases need to be released
@@ -4627,7 +4604,8 @@ private void clearCorruptLazyPersistFiles()
46274604
INodesInPath.fromINode((INodeFile) bc), false);
46284605
changed |= toRemoveBlocks != null;
46294606
if (toRemoveBlocks != null) {
4630-
removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
4607+
blockManager.addBLocksToMarkedDeleteQueue(
4608+
toRemoveBlocks.getToDeleteList());
46314609
}
46324610
}
46334611
} finally {
@@ -7338,7 +7316,8 @@ void deleteSnapshot(String snapshotRoot, String snapshotName,
73387316
// Breaking the pattern as removing blocks have to happen outside of the
73397317
// global lock
73407318
if (blocksToBeDeleted != null) {
7341-
removeBlocks(blocksToBeDeleted);
7319+
blockManager.addBLocksToMarkedDeleteQueue(
7320+
blocksToBeDeleted.getToDeleteList());
73427321
}
73437322
logAuditEvent(true, operationName, rootPath, null, null);
73447323
}
@@ -7364,7 +7343,8 @@ public void gcDeletedSnapshot(String snapshotRoot, String snapshotName)
73647343
} finally {
73657344
writeUnlock(operationName, getLockReportInfoSupplier(rootPath));
73667345
}
7367-
removeBlocks(blocksToBeDeleted);
7346+
blockManager.addBLocksToMarkedDeleteQueue(
7347+
blocksToBeDeleted.getToDeleteList());
73687348
}
73697349

73707350
/**

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public class NameNodeMetrics {
8989
MutableCounterLong blockOpsBatched;
9090
@Metric("Number of pending edits")
9191
MutableGaugeInt pendingEditsCount;
92+
@Metric("Number of delete blocks Queued")
93+
MutableGaugeInt deleteBlocksQueued;
94+
@Metric("Number of pending deletion blocks")
95+
MutableGaugeInt pendingDeleteBlocksCount;
9296

9397
@Metric("Number of file system operations")
9498
public long totalFileOps(){
@@ -341,6 +345,18 @@ public void setBlockOpsQueued(int size) {
341345
blockOpsQueued.set(size);
342346
}
343347

348+
public void setDeleteBlocksQueued(int size) {
349+
deleteBlocksQueued.set(size);
350+
}
351+
352+
public void incrPendingDeleteBlocksCount(int size) {
353+
pendingDeleteBlocksCount.incr(size);
354+
}
355+
356+
public void decrPendingDeleteBlocksCount() {
357+
pendingDeleteBlocksCount.decr();
358+
}
359+
344360
public void addBlockOpsBatched(int count) {
345361
blockOpsBatched.incr(count);
346362
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ public void testScheduledBlocksCounterDecrementOnDeletedBlock()
190190

191191
// 4. delete the file
192192
dfs.delete(filePath, true);
193+
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
194+
cluster.getNamesystem(0).getBlockManager());
193195
int blocksScheduled = 0;
194196
for (DatanodeDescriptor descriptor : dnList) {
195197
if (descriptor.getBlocksScheduled() != 0) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.fs.Options.Rename;
3131
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
3232
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
33+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
3334
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
3435
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
3536
import org.apache.hadoop.test.GenericTestUtils;
@@ -161,6 +162,8 @@ public void testRenameWithOverwrite() throws Exception {
161162
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
162163
getLocalBlock()) != null);
163164
dfs.rename(srcPath, dstPath, Rename.OVERWRITE);
165+
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
166+
cluster.getNamesystem(0).getBlockManager());
164167
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
165168
getLocalBlock()) == null);
166169

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
7676
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
7777
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
78+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
7879
import org.apache.hadoop.hdfs.server.datanode.DataNode;
7980
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
8081
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -1356,13 +1357,17 @@ public void testFileCreationWithOverwrite() throws Exception {
13561357
assertBlocks(bm, oldBlocks, true);
13571358

13581359
out = dfs.create(filePath, true);
1360+
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
1361+
cluster.getNamesystem(0).getBlockManager());
13591362
byte[] newData = AppendTestUtil.randomBytes(seed, fileSize);
13601363
try {
13611364
out.write(newData);
13621365
} finally {
13631366
out.close();
13641367
}
13651368
dfs.deleteOnExit(filePath);
1369+
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
1370+
cluster.getNamesystem(0).getBlockManager());
13661371

13671372
LocatedBlocks newBlocks = NameNodeAdapter.getBlockLocations(
13681373
nn, file, 0, fileSize);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
2424
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
2525
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
26+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
2627
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
2728
import org.apache.hadoop.hdfs.server.datanode.DataNode;
2829
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -127,7 +128,7 @@ public void testReportBadBlock() throws IOException {
127128
}
128129

129130
@Test
130-
public void testInvalidateBlock() throws IOException {
131+
public void testInvalidateBlock() throws IOException, InterruptedException {
131132
final Path file = new Path("/invalidate");
132133
final int length = 10;
133134
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
@@ -151,6 +152,8 @@ public void testInvalidateBlock() throws IOException {
151152
try {
152153
// delete the file
153154
dfs.delete(file, true);
155+
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
156+
cluster.getNamesystem().getBlockManager());
154157
// check the block is added to invalidateBlocks
155158
final FSNamesystem fsn = cluster.getNamesystem();
156159
final BlockManager bm = fsn.getBlockManager();

0 commit comments

Comments
 (0)