Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3730,9 +3730,19 @@ private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
nodes.toArray(new DatanodeDescriptor[nodes.size()]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
Long genStamp = corruptReplicas.getCorruptReplicaGenerationStamp(blk, node);
if (genStamp == null) {
LOG.warn("CorruptReplicasMap unexpectedly missing generationStamp for datanode {}",
node.getXferAddr());
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
} else {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, genStamp, null,
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
}
} catch (IOException e) {
if(blockLog.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,26 @@ public enum Reason {
CORRUPTION_REPORTED // client or datanode reported the corruption
}

private final Map<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new HashMap<Block, Map<DatanodeDescriptor, Reason>>();
private static class CorruptBlockReplica {
private final Reason reason;
private final long generationStamp;

CorruptBlockReplica(Reason reason, long generationStamp) {
this.reason = reason;
this.generationStamp = generationStamp;
}

public Reason getReason() {
return reason;
}

public long getGenerationStamp() {
return generationStamp;
}
}

private final Map<Block, Map<DatanodeDescriptor, CorruptBlockReplica>> corruptReplicasMap =
new HashMap<Block, Map<DatanodeDescriptor, CorruptBlockReplica>>();

private final LongAdder totalCorruptBlocks = new LongAdder();
private final LongAdder totalCorruptECBlockGroups = new LongAdder();
Expand All @@ -70,9 +88,9 @@ public enum Reason {
*/
void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason, Reason reasonCode, boolean isStriped) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
Map <DatanodeDescriptor, CorruptBlockReplica> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
nodes = new HashMap<DatanodeDescriptor, Reason>();
nodes = new HashMap<DatanodeDescriptor, CorruptBlockReplica>();
corruptReplicasMap.put(blk, nodes);
incrementBlockStat(isStriped);
}
Expand All @@ -96,7 +114,7 @@ void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
Server.getRemoteIp(), reasonText);
}
// Add the node or update the reason.
nodes.put(dn, reasonCode);
nodes.put(dn, new CorruptBlockReplica(reasonCode, blk.getGenerationStamp()));
}

/**
Expand All @@ -105,7 +123,7 @@ void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
*/
void removeFromCorruptReplicasMap(BlockInfo blk) {
if (corruptReplicasMap != null) {
Map<DatanodeDescriptor, Reason> value = corruptReplicasMap.remove(blk);
Map<DatanodeDescriptor, CorruptBlockReplica> value = corruptReplicasMap.remove(blk);
if (value != null) {
decrementBlockStat(blk.isStriped());
}
Expand All @@ -126,15 +144,15 @@ boolean removeFromCorruptReplicasMap(

boolean removeFromCorruptReplicasMap(
BlockInfo blk, DatanodeDescriptor datanode, Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
Map <DatanodeDescriptor, CorruptBlockReplica> datanodes = corruptReplicasMap.get(blk);
if (datanodes == null) {
return false;
}

// if reasons can be compared but don't match, return false.
Reason storedReason = datanodes.get(datanode);
if (reason != Reason.ANY && storedReason != null &&
reason != storedReason) {
CorruptBlockReplica corruptReplica = datanodes.get(datanode);
if (reason != Reason.ANY && corruptReplica != null &&
reason != corruptReplica.getReason()) {
return false;
}

Expand Down Expand Up @@ -172,7 +190,7 @@ private void decrementBlockStat(boolean isStriped) {
* @return collection of nodes. Null if does not exists
*/
Collection<DatanodeDescriptor> getNodes(Block blk) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
Map <DatanodeDescriptor, CorruptBlockReplica> nodes = corruptReplicasMap.get(blk);
if (nodes == null)
return null;
return nodes.keySet();
Expand Down Expand Up @@ -247,6 +265,23 @@ Set<Block> getCorruptBlocksSet() {
return corruptBlocks;
}

/**
* return the generation stamp of a corrupt replica for a given block
* on a given dn
* @param block block that has corrupted replica
* @param node datanode that contains this corrupted replica
* @return reason
*/
Long getCorruptReplicaGenerationStamp(Block block, DatanodeDescriptor node) {
Long generationStamp = null;
if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) {
generationStamp = corruptReplicasMap.get(block).get(node).getGenerationStamp();
}
}
return generationStamp;
}

/**
* return the reason about corrupted replica for a given block
* on a given dn
Expand All @@ -258,7 +293,7 @@ String getCorruptReason(Block block, DatanodeDescriptor node) {
Reason reason = null;
if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) {
reason = corruptReplicasMap.get(block).get(node);
reason = corruptReplicasMap.get(block).get(node).getReason();
}
}
if (reason != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,23 @@ private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveN
*/
@Test(timeout = 60000)
public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception {
testDeleteCorruptReplicaForUnderReplicatedBlockInternal();
}

/*
Same test as testDeleteCorruptReplicaForUnderReplicatedBlock except
"dfs.namenode.corrupt.block.delete.immediately.enabled = false" such that the block invalidation
gets postponed.
*/
@Test(timeout = 60000)
public void testDeleteCorruptReplicaForUnderReplicatedBlockWithInvalidationPostponed()
throws Exception {
getConf().setBoolean(DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
false);
testDeleteCorruptReplicaForUnderReplicatedBlockInternal();
}

public void testDeleteCorruptReplicaForUnderReplicatedBlockInternal() throws Exception {
// Constants
final Path file = new Path("/test-file");
final int numDatanode = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,35 @@ public void testCorruptReplicaInfo()
crm.getCorruptBlockIdsForTesting(bim, BlockType.STRIPED,
10, getStripedBlock(7).getBlockId())));
}


@Test
public void testGetCorruptReplicaGenerationStamp() {
final CorruptReplicasMap crm = new CorruptReplicasMap();
final DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
final DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
final long len = 1000L;
short replFactor = 2;

// Create block replicas with different GenStamps
final Block blk1v1 = new Block(1L, len, 1L);
final Block blk1v2 = new Block(1L, len, 2L);
final Block blk2 = new Block(2L, len, 100L);
addToCorruptReplicasMap(crm, new BlockInfoContiguous(blk1v1, replFactor), dn1);
addToCorruptReplicasMap(crm, new BlockInfoContiguous(blk1v2, replFactor), dn2);
addToCorruptReplicasMap(crm, new BlockInfoContiguous(blk2, replFactor), dn1);

// Validate correct GenStamp is reported for each replica; GenStamp is based on the
// DatanodeDescriptor object being passed & not based on the Block object being passed.
assertEquals(Long.valueOf(1L), crm.getCorruptReplicaGenerationStamp(blk1v1, dn1));
assertEquals(Long.valueOf(2L), crm.getCorruptReplicaGenerationStamp(blk1v1, dn2));
assertEquals(Long.valueOf(1L), crm.getCorruptReplicaGenerationStamp(blk1v2, dn1));
assertEquals(Long.valueOf(2L), crm.getCorruptReplicaGenerationStamp(blk1v2, dn2));

// Validate null returned for non-existent block replica
assertEquals(Long.valueOf(100L), crm.getCorruptReplicaGenerationStamp(blk2, dn1));
assertNull(crm.getCorruptReplicaGenerationStamp(blk2, dn2));
}

private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
BlockInfo blk, DatanodeDescriptor dn) {
crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE, blk.isStriped());
Expand Down