Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ public void removeVolumes(
// Unlike updating the volumeMap in addVolume(), this operation does
// not scan disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = new ArrayList<>();
List<ReplicaInfo> blocks = blkToInvalidate
.computeIfAbsent(bpid, (k) -> new ArrayList<>());
for (Iterator<ReplicaInfo> it =
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
ReplicaInfo block = it.next();
Expand All @@ -591,9 +592,7 @@ public void removeVolumes(
it.remove();
}
}
blkToInvalidate.put(bpid, blocks);
}

storageToRemove.add(sd.getStorageUuid());
storageLocationsToRemove.remove(sdLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;

import java.io.FileInputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -106,6 +105,8 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.slf4j.Logger;
Expand Down Expand Up @@ -268,16 +269,24 @@ public void testAddVolumeWithSameStorageUuid() throws IOException {
}

@Test(timeout = 30000)
public void testRemoveVolumes() throws IOException {
public void testRemoveOneVolume() throws IOException {
// Feed FsDataset with block metadata.
final int NUM_BLOCKS = 100;
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
final int numBlocks = 100;
for (int i = 0; i < numBlocks; i++) {
String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
try (ReplicaHandler replica =
dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
ReplicaHandler replica = null;
try {
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
false);
} finally {
if (replica != null) {
replica.close();
}
}
}

// Remove one volume
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
Expand All @@ -300,6 +309,11 @@ public void testRemoveVolumes() throws IOException {
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());

// DataNode.notifyNamenodeDeletedBlock() should be called 50 times
// as we deleted one volume that has 50 blocks
verify(datanode, times(50))
.notifyNamenodeDeletedBlock(any(), any());

try {
dataset.asyncDiskService.execute(volumeToRemove,
new Runnable() {
Expand All @@ -317,10 +331,81 @@ public void run() {}
totalNumReplicas += dataset.volumeMap.size(bpid);
}
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
+ "volumeMap.", numBlocks / NUM_INIT_VOLUMES,
totalNumReplicas);
}

@Test(timeout = 30000)
public void testRemoveTwoVolumes() throws IOException {
// Feed FsDataset with block metadata.
final int numBlocks = 100;
for (int i = 0; i < numBlocks; i++) {
String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
ReplicaHandler replica = null;
try {
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
false);
} finally {
if (replica != null) {
replica.close();
}
}
}

// Remove two volumes
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
Set<StorageLocation> volumesToRemove = new HashSet<>();
volumesToRemove.add(StorageLocation.parse(dataDirs[0]));
volumesToRemove.add(StorageLocation.parse(dataDirs[1]));

FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
Set<FsVolumeImpl> volumes = new HashSet<>();
for (FsVolumeSpi vol: volReferences) {
for (StorageLocation volume : volumesToRemove) {
if (vol.getStorageLocation().equals(volume)) {
volumes.add((FsVolumeImpl) vol);
}
}
}
assertEquals(2, volumes.size());
volReferences.close();

dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 2;
assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, getNumVolumes());
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());

// DataNode.notifyNamenodeDeletedBlock() should be called 100 times
// as we deleted 2 volumes that have 100 blocks totally
verify(datanode, times(100))
.notifyNamenodeDeletedBlock(any(), any());

for (FsVolumeImpl volume : volumes) {
try {
dataset.asyncDiskService.execute(volume,
new Runnable() {
@Override
public void run() {}
});
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
GenericTestUtils.assertExceptionContains("Cannot find volume", e);
}
}

int totalNumReplicas = 0;
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
totalNumReplicas += dataset.volumeMap.size(bpid);
}
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", 0, totalNumReplicas);
}

@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = getNumVolumes();
Expand Down