Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2463,17 +2463,20 @@ public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles)
// deleted local block file here may lead to missing-block
// when it with only 1 replication left now.
// So remove if from volume map notify namenode is ok.
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
bpid)) {
ReplicaInfo replica;
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
// Check if this block is on the volume map.
ReplicaInfo replica = volumeMap.get(bpid, block);
replica = volumeMap.get(bpid, block);
// Double-check block or meta file existence when checkFiles as true.
if (replica != null && (!checkFiles ||
(!replica.blockDataExists() || !replica.metadataExists()))) {
volumeMap.remove(bpid, block);
invalidate(bpid, replica);
}
}
// Call invalidate method outside the lock
if (replica != null) {
invalidate(bpid, replica);
}
}

public void invalidateMissingBlock(String bpid, Block block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,57 @@ public void run() {
}
}

@Test(timeout = 30000)
public void testInvalidateMissingBlockConcurrent() throws Exception {
// Feed FsDataset with block metadata.
final int numBlocks = 1000;
final int threadCount = 10;

ExecutorService pool = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futureList = new ArrayList<>();

// Random write block and use invalidateMissingBlock() method half of them.
Random random = new Random();
for (int i = 0; i < threadCount; i++) {
class BlockProcessor implements Runnable {
@Override
public void run() {
try {
String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
for (int blockId = 0; blockId < numBlocks; blockId++) {
ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
ReplicaHandler replica = null;
try {
replica = dataset.createRbw(StorageType.DEFAULT, null, eb, false);
if (blockId % 2 > 0) {
dataset.invalidateMissingBlock(bpid, eb.getLocalBlock());
}
} finally {
if (replica != null) {
replica.close();
}
}
}
} catch (Exception ignore) {
// ignore
}
}
}
futureList.add(pool.submit(new BlockProcessor()));
}

// Wait for data generation
for (Future<?> f : futureList) {
f.get();
}
// Wait for the async deletion task finish.
GenericTestUtils.waitFor(() -> dataset.asyncDiskService.countPendingDeletions() == 0,
100, 10000);
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
}
}

@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = getNumVolumes();
Expand Down