Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock";
public static final int DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT
= 1000;
public static final String DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE =
"dfs.namenode.decommission.ec.reconstruction.enable";
public static final boolean DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT = false;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public int getPendingSPSPaths() {
* Limits number of blocks used to check for excess redundancy timeout.
*/
private long excessRedundancyTimeoutCheckLimit;
private boolean decommissionECReconstruction = false;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
Expand Down Expand Up @@ -608,7 +609,9 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
setExcessRedundancyTimeoutCheckLimit(conf.getLong(
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT));

this.decommissionECReconstruction =
conf.getBoolean(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE,
DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT);
printInitialConfigs();
}

Expand Down Expand Up @@ -2239,11 +2242,11 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
List<Byte> liveAndDecommissioningBusyBlockIndices = new ArrayList<>();
List<Byte> liveBusyBlockIndices = new ArrayList<>();
List<Byte> excludeReconstructed = new ArrayList<>();
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
containingNodes, liveReplicaNodes, numReplicas,
liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority);
liveBlockIndices, liveAndDecommissioningBusyBlockIndices, liveBusyBlockIndices, priority);
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
numReplicas);
if (srcNodes == null || srcNodes.length == 0) {
Expand Down Expand Up @@ -2307,17 +2310,19 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
byte[] newIndices = new byte[liveBlockIndices.size()];
adjustSrcNodesAndIndices((BlockInfoStriped)block,
srcNodes, liveBlockIndices, newSrcNodes, newIndices);
byte[] busyIndices = new byte[liveBusyBlockIndices.size()];
for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
busyIndices[i] = liveBusyBlockIndices.get(i);
byte[] liveAndDecommissioningBusyIndices =
new byte[liveAndDecommissioningBusyBlockIndices.size()];
for (int i = 0; i < liveAndDecommissioningBusyBlockIndices.size(); i++) {
liveAndDecommissioningBusyIndices[i] = liveAndDecommissioningBusyBlockIndices.get(i);
}
byte[] excludeReconstructedIndices = new byte[excludeReconstructed.size()];
for (int i = 0; i < excludeReconstructed.size(); i++) {
excludeReconstructedIndices[i] = excludeReconstructed.get(i);
byte[] liveBusyIndices = new byte[liveBusyBlockIndices.size()];
for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
liveBusyIndices[i] = liveBusyBlockIndices.get(i);
}
return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, newIndices, busyIndices, excludeReconstructedIndices);
priority, newIndices, liveAndDecommissioningBusyIndices, liveBusyIndices,
decommissionECReconstruction);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
Expand Down Expand Up @@ -2549,7 +2554,11 @@ private DatanodeDescriptor getDatanodeDescriptorFromStorage(
* replicas of the given block.
* @param liveBlockIndices List to be populated with indices of healthy
* blocks in a striped block group
* @param liveBusyBlockIndices List to be populated with indices of healthy
* @param liveAndDecommissioningBusyBlockIndices List to be populated with indices of live or
* decommissioning blocks in a striped block group
* in busy DN,which the recovery work have reached
* their replication limits
* @param liveBusyBlockIndices List to be populated with indices of live
* blocks in a striped block group in busy DN,
* which the recovery work have reached their
* replication limits
Expand All @@ -2563,7 +2572,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas, List<Byte> liveBlockIndices,
List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int priority) {
List<Byte> liveAndDecommissioningBusyBlockIndices,
List<Byte> liveBusyBlockIndices, int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
Expand Down Expand Up @@ -2631,22 +2641,26 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
&& (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
&& node.getNumberOfBlocksToBeReplicated() +
node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) {
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
if (isStriped) {
if (state == StoredReplicaState.LIVE) {
liveAndDecommissioningBusyBlockIndices.add(blockIndex);
liveBusyBlockIndices.add(blockIndex);
} else if (state == StoredReplicaState.DECOMMISSIONING) {
liveAndDecommissioningBusyBlockIndices.add(blockIndex);
}
}
continue; // already reached replication limit
}

if (node.getNumberOfBlocksToBeReplicated() +
node.getNumberOfBlocksToBeErasureCoded() >= replicationStreamsHardLimit) {
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
if (isStriped) {
if (state == StoredReplicaState.LIVE) {
liveAndDecommissioningBusyBlockIndices.add(blockIndex);
liveBusyBlockIndices.add(blockIndex);
} else if (state == StoredReplicaState.DECOMMISSIONING) {
liveAndDecommissioningBusyBlockIndices.add(blockIndex);
}
}
continue;
}
Expand Down Expand Up @@ -5831,4 +5845,12 @@ public void setMinBlocksForWrite(int minBlocksForWrite) {
public int getMinBlocksForWrite(BlockType blockType) {
return placementPolicies.getPolicy(blockType).getMinBlocksForWrite();
}

public void setDecommissionECReconstruction(boolean decommissionECReconstruction) {
this.decommissionECReconstruction = decommissionECReconstruction;
}

public boolean isDecommissionECReconstruction() {
return decommissionECReconstruction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,26 @@

class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndices;
private final byte[] liveAndDecommissioningBusyBlockIndices;
private final byte[] liveBusyBlockIndices;
private final byte[] excludeReconstructedIndices;
private final String blockPoolId;
private final boolean decommissionECReconstruction;

public ErasureCodingWork(String blockPoolId, BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired, int priority,
byte[] liveBlockIndices, byte[] liveBusyBlockIndices,
byte[] excludeReconstrutedIndices) {
byte[] liveBlockIndices, byte[] liveAndDecommissioningBusyBlockIndices,
byte[] liveBusyBlockIndices, boolean decommissionECReconstruction) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndices = liveBlockIndices;
this.liveAndDecommissioningBusyBlockIndices = liveAndDecommissioningBusyBlockIndices;
this.liveBusyBlockIndices = liveBusyBlockIndices;
this.excludeReconstructedIndices = excludeReconstrutedIndices;
this.decommissionECReconstruction = decommissionECReconstruction;
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
}
Expand Down Expand Up @@ -84,14 +86,14 @@ void chooseTargets(BlockPlacementPolicy blockplacement,
private boolean hasAllInternalBlocks() {
final BlockInfoStriped block = (BlockInfoStriped) getBlock();
if (liveBlockIndices.length
+ liveBusyBlockIndices.length < block.getRealTotalBlockNum()) {
+ liveAndDecommissioningBusyBlockIndices.length < block.getRealTotalBlockNum()) {
return false;
}
BitSet bitSet = new BitSet(block.getTotalBlockNum());
for (byte index : liveBlockIndices) {
bitSet.set(index);
}
for (byte busyIndex: liveBusyBlockIndices) {
for (byte busyIndex: liveAndDecommissioningBusyBlockIndices) {
bitSet.set(busyIndex);
}
for (int i = 0; i < block.getRealDataBlockNum(); i++) {
Expand Down Expand Up @@ -150,18 +152,28 @@ boolean addTaskToDatanode(NumberReplicas numberReplicas) {
numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
hasAllInternalBlocks()) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
if (num == 0) {
flag = false;
if (decommissionECReconstruction && targets.length > 0 &&
getSrcNodes().length >= ((BlockInfoStriped) getBlock()).getDataBlockNum()) {
// Here we use liveBusyBlockIndices as excludeReconstrutedIndices which only include LIVE.
// If ec reconstruction is enabled when decommissioning, we will reconstruct
// DECOMMISSIONING index.
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, liveBusyBlockIndices, stripedBlk.getErasureCodingPolicy());
} else {
flag = false;
}
}
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
} else {
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
liveBlockIndices, liveAndDecommissioningBusyBlockIndices,
stripedBlk.getErasureCodingPolicy());
}
return flag;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ public void incrBlocksReplicated() {
blocksReplicated.incr();
}

public long getBlocksReplicated() {
return blocksReplicated.value();
}

public void incrBlocksWritten() {
blocksWritten.incr();
}
Expand Down Expand Up @@ -584,6 +588,10 @@ public void incrECReconstructionTasks() {
ecReconstructionTasks.incr();
}

public long getECReconstructionTasks() {
return ecReconstructionTasks.value();
}

public void incrECFailedReconstructionTasks() {
ecFailedReconstructionTasks.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
Expand Down Expand Up @@ -383,7 +385,8 @@ public enum OperationCategory {
DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY));
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2356,7 +2359,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
|| property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|| property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)
|| property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
|| property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)
|| property.equals(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE)) {
return reconfReplicationParameters(newVal, property);
} else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
Expand Down Expand Up @@ -2395,41 +2399,47 @@ protected String reconfigurePropertyImpl(String property, String newVal)
private String reconfReplicationParameters(final String newVal,
final String property) throws ReconfigurationException {
BlockManager bm = namesystem.getBlockManager();
int newSetting;
String newSetting;
namesystem.writeLock();
try {
if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) {
bm.setMaxReplicationStreams(
adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal));
newSetting = bm.getMaxReplicationStreams();
newSetting = Integer.toString(bm.getMaxReplicationStreams());
} else if (property.equals(
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) {
bm.setReplicationStreamsHardLimit(
adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT,
newVal));
newSetting = bm.getReplicationStreamsHardLimit();
newSetting = Integer.toString(bm.getReplicationStreamsHardLimit());
} else if (
property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
bm.setBlocksReplWorkMultiplier(
adjustNewVal(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
newVal));
newSetting = bm.getBlocksReplWorkMultiplier();
newSetting = Integer.toString(bm.getBlocksReplWorkMultiplier());
} else if (
property.equals(
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
bm.setReconstructionPendingTimeout(
adjustNewVal(
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT,
newVal));
newSetting = bm.getReconstructionPendingTimeout();
newSetting = Integer.toString(bm.getReconstructionPendingTimeout());
} else if (property.equals(DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE)) {
boolean decommissionECReconstruction =
(newVal == null) ? DFS_NAMENODE_DECOMMISSION_EC_RECONSTRUCTION_ENABLE_DEFAULT :
Boolean.parseBoolean(newVal);
bm.setDecommissionECReconstruction(decommissionECReconstruction);
newSetting = Boolean.toString(bm.isDecommissionECReconstruction());
} else {
throw new IllegalArgumentException("Unexpected property " +
property + " in reconfReplicationParameters");
}
LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
return String.valueOf(newSetting);
return newSetting;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,14 @@
</description>
</property>

<property>
<name>dfs.namenode.decommission.ec.reconstruction.enable</name>
<value>false</value>
<description>
Whether to use reconstruction to copy ec block when the related node is busy.
</description>
</property>

<property>
<name>dfs.namenode.redundancy.interval.seconds</name>
<value>3</value>
Expand Down
Loading
Loading