Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,11 @@ public float getReconstructionQueuesInitProgress() {
return 0;
}

@Override
public String getCollectSlowNodesIpAddrCounts() {
return "N/A";
}

private Router getRouter() throws IOException {
if (this.router == null) {
throw new IOException("Router is not initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicLongMap;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -214,6 +215,8 @@ public class DatanodeManager {
private volatile long slowPeerCollectionInterval;
private volatile int maxSlowPeerReportNodes;

private final AtomicLongMap<String> collectSlowNodesIpAddrCounts = AtomicLongMap.create();

@Nullable
private final SlowDiskTracker slowDiskTracker;

Expand Down Expand Up @@ -421,6 +424,7 @@ private void stopSlowPeerCollector() {
} finally {
slowPeerCollectorDaemon = null;
slowNodesUuidSet.clear();
collectSlowNodesIpAddrCounts.clear();
}
}

Expand Down Expand Up @@ -2204,8 +2208,18 @@ public Set<String> getSlowPeersUuidSet() {
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
datanodeDescriptors.forEach(
datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
datanodeDescriptors.forEach(datanodeDescriptor -> {
slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid());
String ipAddr = datanodeDescriptor.getIpAddr();
collectSlowNodesIpAddrCounts.incrementAndGet(ipAddr);
});
// Clean up nodes that are not in the host2DatanodeMap
for (String ipAddr : collectSlowNodesIpAddrCounts.asMap().keySet()) {
DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost == null) {
collectSlowNodesIpAddrCounts.decrementAndGet(ipAddr);
}
}
return slowPeersUuidSet;
}

Expand Down Expand Up @@ -2233,6 +2247,14 @@ public static Set<String> getSlowNodesUuidSet() {
return slowNodesUuidSet;
}

/**
* Return the map of the frequency of collecting slow datanodes.
* @return map type
*/
public Map<String, Long> getCollectSlowNodesIpAddrCounts() {
return collectSlowNodesIpAddrCounts.asMap();
}

/**
* Use only for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4920,6 +4920,14 @@ public float getReconstructionQueuesInitProgress() {
return blockManager.getReconstructionQueuesInitProgress();
}

@Override // FSNamesystemMBean
@Metric
public String getCollectSlowNodesIpAddrCounts() {
Map<String, Long> recordSlowNodesIpCounts =
getBlockManager().getDatanodeManager().getCollectSlowNodesIpAddrCounts();
return JSON.toString(recordSlowNodesIpCounts);
}

/**
* Returns the length of the wait Queue for the FSNameSystemLock.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode.metrics;

import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;

/**
Expand Down Expand Up @@ -268,4 +270,12 @@ public interface FSNamesystemMBean {
* @return Returns values between 0 and 1 for the progress.
*/
float getReconstructionQueuesInitProgress();

/**
* Returns a nested JSON object listing the collect slowNodesIpAddr counts Map,
* e.g. {"1.1.1.1":4,"2.2.2.2":3}
*
* @return JSON string.
*/
String getCollectSlowNodesIpAddrCounts();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -31,12 +39,18 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -269,4 +283,65 @@ public void testReconstructionQueuesInitProgressMetrics() throws Exception {
assertEquals(1.0, reconstructionQueuesInitProgress, 0);
}
}

@Test
public void testCollectSlowNodesIpAddrCountsMetrics() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000");
conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1");
conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1");
conf.setStrings(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "1s");
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build()) {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fsNamesystem = cluster.getNameNode().getNamesystem();

assertEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrCounts());
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName mxBeanName = new ObjectName("Hadoop:service=NameNode,name=FSNamesystemState");
String ipAddrCounts =
(String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrCounts");
assertEquals("{}", ipAddrCounts);

List<DataNode> dataNodes = cluster.getDataNodes();
assertEquals(2, dataNodes.size());

dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(),
new OutlierMetrics(1.0, 2.0, 3.0, 4.0)));

GenericTestUtils.waitFor(() -> {
try {
DatanodeInfo[] slowNodeInfo = fs.getSlowDatanodeStats();
return slowNodeInfo.length == 1;
} catch (IOException e) {
return false;
}
}, 1000, 10000, "Slow nodes could not be detected");

GenericTestUtils.waitFor(() -> {
String tmp;
DatanodeInfo[] slowNodes;
try {
tmp = (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrCounts");
slowNodes = fs.getSlowDatanodeStats();
} catch (Exception e) {
throw new RuntimeException(e);
}
Map<String, Long> ipAddrFrequencyMap = (HashMap) JSON.parse(tmp);
for (Map.Entry<String, Long> entry : ipAddrFrequencyMap.entrySet()) {
boolean condition1 = slowNodes[0].getIpAddr().equals(entry.getKey());
boolean condition2 = entry.getValue() > 1;
return condition1 && condition2;
}
return false;
}, 100, 10000);

cluster.getNameNode().reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
DatanodeManager dnManager = fsNamesystem.getBlockManager().getDatanodeManager();
assertEquals("{}", dnManager.getCollectSlowNodesIpAddrCounts().toString());
}
}

}