Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
.map(dn -> dn.getNetworkLocation()).distinct().count();

return new BlockPlacementStatusDefault(Math.toIntExact(rackCount),
minRacks, clusterMap.getNumOfRacks());
minRacks, clusterMap.getNumOfRacks() - stats.getNumOfExcludedRacks());
}

/**
Expand Down Expand Up @@ -1369,5 +1369,10 @@ public void setExcludeSlowNodesEnabled(boolean enable) {
public boolean getExcludeSlowNodesEnabled() {
return excludeSlowNodesEnabled;
}

protected FSClusterStats getStats() {
return stats;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
racks.add(dn.getNetworkLocation());
}
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
clusterMap.getNumOfRacks());
clusterMap.getNumOfRacks() - getStats().getNumOfExcludedRacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ public int getAdditionalReplicasRequired() {
if (isPlacementPolicySatisfied()) {
return 0;
} else {
return requiredRacks - currentRacks;
// As 'requiredRacks - currentRacks' accounts for racks still needed,
// while 'totalRacks - currentRacks' accounts for racks can extra provided.
// When either racks still needed or racks can extra provided <= 0,
// it would be considered as placement satisfied, see
// 'BlockPlacementStatusDefault#isPlacementPolicySatisfied'.
// So as the method name, the additional replicas required should be
// the minimum value of them.
return Math.min(requiredRacks - currentRacks, totalRacks - currentRacks);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
blockManager.getDatanodeManager().refreshDecommissionRacks();
// hbManager.startDecommission will set dead node to decommissioned.
if (node.isDecommissionInProgress()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
Expand All @@ -201,6 +202,7 @@ public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
blockManager.getDatanodeManager().refreshDecommissionRacks();
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ public class DatanodeManager {
*/
private boolean hasClusterEverBeenMultiRack = false;

/**
* Number of racks of which all nodes are excluded: in decommission or decommissioned.
*/
private int numOfExcludedRacks = 0;

private final boolean checkIpHostnameInRegistration;
/**
* Whether we should tell datanodes what to cache in replies to
Expand Down Expand Up @@ -1300,6 +1305,29 @@ public void refreshNodes(final Configuration conf) throws IOException {
}
}

/**
* Refresh the number of decommission racks:
* all nodes in which are either in DECOMMISSION_INPROGRESS or DECOMMISSIONED.
*/
public void refreshDecommissionRacks() {
List<DatanodeDescriptor> copy;
synchronized (this) {
copy = new ArrayList<>(datanodeMap.values());
}
Set<String> decommissionRacks = new HashSet<>();
for (DatanodeDescriptor dn : copy) {
if (dn.isDecommissionInProgress() || dn.isDecommissioned()) {
decommissionRacks.add(dn.getNetworkLocation());
}
}
for (DatanodeDescriptor dn : copy) {
if (!dn.isDecommissionInProgress() && !dn.isDecommissioned()) {
decommissionRacks.remove(dn.getNetworkLocation());
}
}
this.numOfExcludedRacks = decommissionRacks.size();
}

/** Reread include/exclude files. */
private void refreshHostsReader(Configuration conf) throws IOException {
// Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
Expand Down Expand Up @@ -2060,6 +2088,12 @@ public double getInServiceXceiverAverage() {
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return heartbeatManager.getStorageTypeStats();
}

@Override
public int getNumOfExcludedRacks() {
return numOfExcludedRacks;
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.net.Node;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -130,19 +131,32 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
assert targets.length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();

if (hasNotEnoughRack()) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
createReplicationWork(sourceIndex, targets[0]);
} else if ((numberReplicas.decommissioning() > 0 ||
numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
hasAllInternalBlocks()) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
if (hasAllInternalBlocks()) {
// We should take care of decommission first, specially when rack is not enough.
if (numberReplicas.decommissioning() > 0 ||
numberReplicas.liveEnteringMaintenanceReplicas() > 0) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
} else if (hasNotEnoughRack()) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
createReplicationWork(sourceIndex, targets[0]);
} else {
// In fact it will not reach here, we dump related information in case of bug or what else.
LOG.warn("Unknown reconstruction work" +
", block: " + getBlock()
+ ", srcNodes: " + Arrays.toString(getSrcNodes())
+ ", liveReplicaStorages: " + getLiveReplicaStorages()
+ ", additionalReplRequired: " + getAdditionalReplRequired()
+ ", targets: " + Arrays.toString(targets)
+ ", notEnoughRack: " + hasNotEnoughRack()
+ ", liveBlockIndicies: " + Arrays.toString(liveBlockIndices)
+ ", liveBusyBlockIndicies: " + Arrays.toString(liveBusyBlockIndices));
}
} else {
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ public interface FSClusterStats {
* @return storage statistics per storage type.
*/
Map<StorageType, StorageTypeStats> getStorageTypeStats();

/**
* Number of racks of which all nodes are excluded: in decommission or decommissioned.
* @return Number of excluded racks.
*/
int getNumOfExcludedRacks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ public void run() {
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
blockManager.getDatanodeManager().refreshDecommissionRacks();
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
this.out = out;
this.totalDatanodes = totalDatanodes;
this.remoteAddress = remoteAddress;
this.bpPolicies = new BlockPlacementPolicies(conf, null,
networktopology,
this.bpPolicies = new BlockPlacementPolicies(conf,
this.blockManager.getDatanodeManager().getFSClusterStats(), networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
this.staleInterval =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;

/**
* Test erasure coding when racks not enough.
*/
public class TestErasureCodingNotEnoughRacks {
public static final Logger LOG =
LoggerFactory.getLogger(TestErasureCodingNotEnoughRacks.class);

private MiniDFSCluster cluster;
private ErasureCodingPolicy ecPolicy;
private DistributedFileSystem dfs;
private int blockSize;
private final int fileSize = 10 * 1024 * 1024; // 10 MiB.

@Before
public void setup() throws Exception {
ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); // RS_6_3
blockSize = ecPolicy.getCellSize() * 2;
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);
cluster = DFSTestUtil.setupCluster(conf, 12, 6, 0);
dfs = cluster.getFileSystem();
dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
}

@Test(timeout = 30000)
public void testBasic() throws Exception {
Path ecFile = new Path("/foo1");
writeFile(ecFile);

StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
checkFile(ecFile);
}

@Test(timeout = 30000)
public void testDecommissionOneNode() throws Exception {
Path ecFile = new Path("/foo2");
writeFile(ecFile);

BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
DatanodeManager datanodeManager = blockManager.getDatanodeManager();

Map<String, List<DatanodeDescriptor>> racksMap = getRacksMap(datanodeManager.getDatanodes());
DatanodeDescriptor node = racksMap.values().iterator().next().get(0);

datanodeManager.getDatanodeAdminManager().startDecommission(node);
waitNodeDecommissioned(node);

checkFile(ecFile);
}

@Test(timeout = 30000)
public void testDecommissionOneRack() throws Exception {
Path ecFile = new Path("/foo3");
writeFile(ecFile);

BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
DatanodeManager datanodeManager = blockManager.getDatanodeManager();

Map<String, List<DatanodeDescriptor>> racksMap = getRacksMap(datanodeManager.getDatanodes());
List<DatanodeDescriptor> nodes = racksMap.values().iterator().next();

for (DatanodeDescriptor node : nodes) {
datanodeManager.getDatanodeAdminManager().startDecommission(node);
}

for (DatanodeDescriptor node : nodes) {
waitNodeDecommissioned(node);
}

checkFile(ecFile);
}

@After
public void teardown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}

private Map<String, List<DatanodeDescriptor>> getRacksMap(Collection<DatanodeDescriptor> nodes) {
Map<String, List<DatanodeDescriptor>> racksMap = new HashMap<>();
for (DatanodeDescriptor dn : nodes) {
racksMap.computeIfAbsent(dn.getNetworkLocation(), k -> new ArrayList<>()).add(dn);
}
return racksMap;
}

private void writeFile(Path ecFile) throws IOException {
byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
}

private void checkFile(Path ecFile) throws Exception {
StripedFileTestUtil.checkData(dfs, ecFile, fileSize, new ArrayList<>(),
null, ecPolicy.getNumDataUnits() * blockSize);
}

private void waitNodeDecommissioned(DatanodeInfo node) {
DatanodeInfo.AdminStates state = DatanodeInfo.AdminStates.DECOMMISSIONED;
boolean done = (state == node.getAdminState());
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// nothing
}
done = (state == node.getAdminState());
}
}

}