From 8a3756bba5e0600e7baba24f56e2a728214f8666 Mon Sep 17 00:00:00 2001 From: huangzhaobo Date: Wed, 20 Dec 2023 09:24:04 +0800 Subject: [PATCH 1/3] HDFS-17292. Show the number of times the slowPeerCollectorDaemon thread has collected SlowNodes. --- .../metrics/NamenodeBeanMetrics.java | 5 ++ .../blockmanagement/DatanodeManager.java | 27 +++++- .../hdfs/server/namenode/FSNamesystem.java | 8 ++ .../namenode/metrics/FSNamesystemMBean.java | 8 ++ .../namenode/TestFSNamesystemMBean.java | 82 +++++++++++++++++++ 5 files changed, 128 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index d16543e2d867f..04ce650096629 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -889,6 +889,11 @@ public float getReconstructionQueuesInitProgress() { return 0; } + @Override + public String getCollectSlowNodesIpAddrFrequencyMap() { + return "N/A"; + } + private Router getRouter() throws IOException { if (this.router == null) { throw new IOException("Router is not initialized"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 1d2ed74640806..cfb67219afc42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -69,6 +69,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -214,6 +215,8 @@ public class DatanodeManager { private volatile long slowPeerCollectionInterval; private volatile int maxSlowPeerReportNodes; + private final Map collectSlowNodesIpAddrFrequencyMap = new ConcurrentHashMap<>(); + @Nullable private final SlowDiskTracker slowDiskTracker; @@ -421,6 +424,7 @@ private void stopSlowPeerCollector() { } finally { slowPeerCollectorDaemon = null; slowNodesUuidSet.clear(); + collectSlowNodesIpAddrFrequencyMap.clear(); } } @@ -2204,8 +2208,19 @@ public Set getSlowPeersUuidSet() { Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned"); slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes); List datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes); - datanodeDescriptors.forEach( - datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid())); + datanodeDescriptors.forEach(datanodeDescriptor -> { + slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()); + String ipAddr = datanodeDescriptor.getIpAddr(); + collectSlowNodesIpAddrFrequencyMap.putIfAbsent(ipAddr, 1); + collectSlowNodesIpAddrFrequencyMap.computeIfPresent(ipAddr, (k, v) -> v + 1); + }); + // Clean up nodes that are not in the host2DatanodeMap + for (String ipAddr : collectSlowNodesIpAddrFrequencyMap.keySet()) { + DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr); + if (datanodeByHost == null) { + collectSlowNodesIpAddrFrequencyMap.remove(ipAddr); + } + } return slowPeersUuidSet; } @@ -2233,6 +2248,14 @@ public static Set getSlowNodesUuidSet() { return slowNodesUuidSet; } + /** + * Return the map of the frequency of collecting slow datanodes. + * @return map type + */ + public Map getCollectSlowNodesIpAddrFrequencyMap() { + return collectSlowNodesIpAddrFrequencyMap; + } + /** * Use only for testing. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c14526b6a11b3..52b1c6e89f010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4920,6 +4920,14 @@ public float getReconstructionQueuesInitProgress() { return blockManager.getReconstructionQueuesInitProgress(); } + @Override // FSNamesystemMBean + @Metric + public String getCollectSlowNodesIpAddrFrequencyMap() { + Map recordSlowNodesIpAddr = + getBlockManager().getDatanodeManager().getCollectSlowNodesIpAddrFrequencyMap(); + return JSON.toString(recordSlowNodesIpAddr); + } + /** * Returns the length of the wait Queue for the FSNameSystemLock. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java index 746cae7113a64..261156a8827d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java @@ -268,4 +268,12 @@ public interface FSNamesystemMBean { * @return Returns values between 0 and 1 for the progress. */ float getReconstructionQueuesInitProgress(); + + /** + * Returns a nested JSON object listing the collect slowNodesIpAddr frequency Map, + * e.g. {"1.1.1.1":4,"2.2.2.2":3} + * + * @return JSON string. + */ + String getCollectSlowNodesIpAddrFrequencyMap(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java index 2353dd975a708..1c8a3cf2baa69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java @@ -17,26 +17,41 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; +import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.metrics2.impl.ConfigBuilder; import org.apache.hadoop.metrics2.impl.TestMetricsConfig; import org.apache.hadoop.test.GenericTestUtils; @@ -269,4 +284,71 @@ public void testReconstructionQueuesInitProgressMetrics() throws Exception { assertEquals(1.0, reconstructionQueuesInitProgress, 0); } } + + @Test + public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); + conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000"); + conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1"); + conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1"); + conf.setStrings(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "1s"); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fsNamesystem = cluster.getNameNode().getNamesystem(); + + assertEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrFrequencyMap()); + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxBeanName = new ObjectName("Hadoop:service=NameNode,name=FSNamesystemState"); + String ipAddrFrequency = + (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap"); + assertEquals("{}", ipAddrFrequency); + + List dataNodes = cluster.getDataNodes(); + assertEquals(2, dataNodes.size()); + + DatanodeManager dnManager = fsNamesystem.getBlockManager().getDatanodeManager(); + + dnManager.addSlowPeers(dataNodes.get(1).getDatanodeUuid()); + dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of( + dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), + new OutlierMetrics(1.0, 2.0, 3.0, 4.0))); + + GenericTestUtils.waitFor(() -> { + try { + DatanodeInfo[] slowNodeInfo = fs.getSlowDatanodeStats(); + return slowNodeInfo.length == 1; + } catch (IOException e) { + return false; + } + }, 1000, 10000, "Slow nodes could not be detected"); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + String ipAddrFrequency; + DatanodeInfo[] slowNodes; + try { + ipAddrFrequency = + (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap"); + slowNodes = fs.getSlowDatanodeStats(); + } catch (Exception e) { + throw new RuntimeException(e); + } + Map ipAddrFrequencyMap = (HashMap) JSON.parse(ipAddrFrequency); + for (Map.Entry entry : ipAddrFrequencyMap.entrySet()) { + boolean condition1 = slowNodes[0].getIpAddr().equals(entry.getKey()); + boolean condition2 = entry.getValue() > 1; + return condition1 && condition2; + } + return false; + } + }, 100, 10000); + + cluster.getNameNode().reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false"); + assertEquals("{}", dnManager.getCollectSlowNodesIpAddrFrequencyMap().toString()); + } + } + } From 141ce0b6eb9c83f6a5c1f3a0c87397f01123e7aa Mon Sep 17 00:00:00 2001 From: huangzhaobo Date: Tue, 2 Jan 2024 08:01:00 +0800 Subject: [PATCH 2/3] Modify UT --- .../hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java index 1c8a3cf2baa69..792a851064445 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java @@ -308,9 +308,6 @@ public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception { List dataNodes = cluster.getDataNodes(); assertEquals(2, dataNodes.size()); - DatanodeManager dnManager = fsNamesystem.getBlockManager().getDatanodeManager(); - - dnManager.addSlowPeers(dataNodes.get(1).getDatanodeUuid()); dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of( dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), new OutlierMetrics(1.0, 2.0, 3.0, 4.0))); @@ -347,6 +344,7 @@ public Boolean get() { }, 100, 10000); cluster.getNameNode().reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false"); + DatanodeManager dnManager = fsNamesystem.getBlockManager().getDatanodeManager(); assertEquals("{}", dnManager.getCollectSlowNodesIpAddrFrequencyMap().toString()); } } From 4d5d50144572e8eb3b6e356bde203d89f2fbe486 Mon Sep 17 00:00:00 2001 From: huangzhaobo Date: Sun, 28 Jan 2024 12:19:40 +0800 Subject: [PATCH 3/3] Optimize API --- .../metrics/NamenodeBeanMetrics.java | 2 +- .../blockmanagement/DatanodeManager.java | 17 ++++--- .../hdfs/server/namenode/FSNamesystem.java | 8 ++-- .../namenode/metrics/FSNamesystemMBean.java | 6 ++- .../namenode/TestFSNamesystemMBean.java | 47 +++++++++---------- 5 files changed, 38 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index 04ce650096629..dd6660abf65fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -890,7 +890,7 @@ public float getReconstructionQueuesInitProgress() { } @Override - public String getCollectSlowNodesIpAddrFrequencyMap() { + public String getCollectSlowNodesIpAddrCounts() { return "N/A"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cfb67219afc42..7c8f097ba3c92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicLongMap; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -69,7 +70,6 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -215,7 +215,7 @@ public class DatanodeManager { private volatile long slowPeerCollectionInterval; private volatile int maxSlowPeerReportNodes; - private final Map collectSlowNodesIpAddrFrequencyMap = new ConcurrentHashMap<>(); + private final AtomicLongMap collectSlowNodesIpAddrCounts = AtomicLongMap.create(); @Nullable private final SlowDiskTracker slowDiskTracker; @@ -424,7 +424,7 @@ private void stopSlowPeerCollector() { } finally { slowPeerCollectorDaemon = null; slowNodesUuidSet.clear(); - collectSlowNodesIpAddrFrequencyMap.clear(); + collectSlowNodesIpAddrCounts.clear(); } } @@ -2211,14 +2211,13 @@ public Set getSlowPeersUuidSet() { datanodeDescriptors.forEach(datanodeDescriptor -> { slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()); String ipAddr = datanodeDescriptor.getIpAddr(); - collectSlowNodesIpAddrFrequencyMap.putIfAbsent(ipAddr, 1); - collectSlowNodesIpAddrFrequencyMap.computeIfPresent(ipAddr, (k, v) -> v + 1); + collectSlowNodesIpAddrCounts.incrementAndGet(ipAddr); }); // Clean up nodes that are not in the host2DatanodeMap - for (String ipAddr : collectSlowNodesIpAddrFrequencyMap.keySet()) { + for (String ipAddr : collectSlowNodesIpAddrCounts.asMap().keySet()) { DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr); if (datanodeByHost == null) { - collectSlowNodesIpAddrFrequencyMap.remove(ipAddr); + collectSlowNodesIpAddrCounts.decrementAndGet(ipAddr); } } return slowPeersUuidSet; @@ -2252,8 +2251,8 @@ public static Set getSlowNodesUuidSet() { * Return the map of the frequency of collecting slow datanodes. * @return map type */ - public Map getCollectSlowNodesIpAddrFrequencyMap() { - return collectSlowNodesIpAddrFrequencyMap; + public Map getCollectSlowNodesIpAddrCounts() { + return collectSlowNodesIpAddrCounts.asMap(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 52b1c6e89f010..ec43aee3c80e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4922,10 +4922,10 @@ public float getReconstructionQueuesInitProgress() { @Override // FSNamesystemMBean @Metric - public String getCollectSlowNodesIpAddrFrequencyMap() { - Map recordSlowNodesIpAddr = - getBlockManager().getDatanodeManager().getCollectSlowNodesIpAddrFrequencyMap(); - return JSON.toString(recordSlowNodesIpAddr); + public String getCollectSlowNodesIpAddrCounts() { + Map recordSlowNodesIpCounts = + getBlockManager().getDatanodeManager().getCollectSlowNodesIpAddrCounts(); + return JSON.toString(recordSlowNodesIpCounts); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java index 261156a8827d8..9f7609c992d14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.metrics; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience; /** @@ -270,10 +272,10 @@ public interface FSNamesystemMBean { float getReconstructionQueuesInitProgress(); /** - * Returns a nested JSON object listing the collect slowNodesIpAddr frequency Map, + * Returns a nested JSON object listing the collect slowNodesIpAddr counts Map, * e.g. {"1.1.1.1":4,"2.2.2.2":3} * * @return JSON string. */ - String getCollectSlowNodesIpAddrFrequencyMap(); + String getCollectSlowNodesIpAddrCounts(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java index 792a851064445..51ce89c79f4b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Supplier; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; @@ -286,7 +285,7 @@ public void testReconstructionQueuesInitProgressMetrics() throws Exception { } @Test - public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception { + public void testCollectSlowNodesIpAddrCountsMetrics() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000"); @@ -298,12 +297,12 @@ public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); FSNamesystem fsNamesystem = cluster.getNameNode().getNamesystem(); - assertEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrFrequencyMap()); + assertEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrCounts()); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); ObjectName mxBeanName = new ObjectName("Hadoop:service=NameNode,name=FSNamesystemState"); - String ipAddrFrequency = - (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap"); - assertEquals("{}", ipAddrFrequency); + String ipAddrCounts = + (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrCounts"); + assertEquals("{}", ipAddrCounts); List dataNodes = cluster.getDataNodes(); assertEquals(2, dataNodes.size()); @@ -321,31 +320,27 @@ public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception { } }, 1000, 10000, "Slow nodes could not be detected"); - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - String ipAddrFrequency; - DatanodeInfo[] slowNodes; - try { - ipAddrFrequency = - (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap"); - slowNodes = fs.getSlowDatanodeStats(); - } catch (Exception e) { - throw new RuntimeException(e); - } - Map ipAddrFrequencyMap = (HashMap) JSON.parse(ipAddrFrequency); - for (Map.Entry entry : ipAddrFrequencyMap.entrySet()) { - boolean condition1 = slowNodes[0].getIpAddr().equals(entry.getKey()); - boolean condition2 = entry.getValue() > 1; - return condition1 && condition2; - } - return false; + GenericTestUtils.waitFor(() -> { + String tmp; + DatanodeInfo[] slowNodes; + try { + tmp = (String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrCounts"); + slowNodes = fs.getSlowDatanodeStats(); + } catch (Exception e) { + throw new RuntimeException(e); + } + Map ipAddrFrequencyMap = (HashMap) JSON.parse(tmp); + for (Map.Entry entry : ipAddrFrequencyMap.entrySet()) { + boolean condition1 = slowNodes[0].getIpAddr().equals(entry.getKey()); + boolean condition2 = entry.getValue() > 1; + return condition1 && condition2; } + return false; }, 100, 10000); cluster.getNameNode().reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false"); DatanodeManager dnManager = fsNamesystem.getBlockManager().getDatanodeManager(); - assertEquals("{}", dnManager.getCollectSlowNodesIpAddrFrequencyMap().toString()); + assertEquals("{}", dnManager.getCollectSlowNodesIpAddrCounts().toString()); } }