Skip to content

Commit aaa2dee

Browse files
committed
HDFS-17292. Show the number of times the slowPeerCollectorDaemon thread has collected SlowNodes.
1 parent 62cc673 commit aaa2dee

File tree

5 files changed

+110
-2
lines changed

5 files changed

+110
-2
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,11 @@ public float getReconstructionQueuesInitProgress() {
889889
return 0;
890890
}
891891

892+
@Override
893+
public String getCollectSlowNodesIpAddrFrequencyMap() {
894+
return "N/A";
895+
}
896+
892897
private Router getRouter() throws IOException {
893898
if (this.router == null) {
894899
throw new IOException("Router is not initialized");

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.net.InetSocketAddress;
7070
import java.net.UnknownHostException;
7171
import java.util.*;
72+
import java.util.concurrent.ConcurrentHashMap;
7273
import java.util.concurrent.ThreadLocalRandom;
7374
import java.util.concurrent.TimeUnit;
7475
import java.util.function.Consumer;
@@ -214,6 +215,8 @@ public class DatanodeManager {
214215
private final long slowPeerCollectionInterval;
215216
private volatile int maxSlowPeerReportNodes;
216217

218+
private final Map<String, Integer> collectSlowNodesIpAddrFrequencyMap = new ConcurrentHashMap<>();
219+
217220
@Nullable
218221
private final SlowDiskTracker slowDiskTracker;
219222

@@ -421,6 +424,7 @@ public void stopSlowPeerCollector() {
421424
} finally {
422425
slowPeerCollectorDaemon = null;
423426
slowNodesUuidSet.clear();
427+
collectSlowNodesIpAddrFrequencyMap.clear();
424428
}
425429
}
426430

@@ -2193,8 +2197,23 @@ public Set<String> getSlowPeersUuidSet() {
21932197
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
21942198
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
21952199
List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
2196-
datanodeDescriptors.forEach(
2197-
datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
2200+
datanodeDescriptors.forEach(datanodeDescriptor -> {
2201+
slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid());
2202+
String ipAddr = datanodeDescriptor.getIpAddr();
2203+
if (!collectSlowNodesIpAddrFrequencyMap.containsKey(ipAddr)) {
2204+
collectSlowNodesIpAddrFrequencyMap.put(ipAddr, 1);
2205+
} else {
2206+
collectSlowNodesIpAddrFrequencyMap.put(ipAddr,
2207+
collectSlowNodesIpAddrFrequencyMap.get(ipAddr) + 1);
2208+
}
2209+
});
2210+
// Clean up nodes that are not in the host2DatanodeMap
2211+
for (String ipAddr : collectSlowNodesIpAddrFrequencyMap.keySet()) {
2212+
DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr);
2213+
if (datanodeByHost == null) {
2214+
collectSlowNodesIpAddrFrequencyMap.remove(ipAddr);
2215+
}
2216+
}
21982217
return slowPeersUuidSet;
21992218
}
22002219

@@ -2222,6 +2241,10 @@ public static Set<String> getSlowNodesUuidSet() {
22222241
return slowNodesUuidSet;
22232242
}
22242243

2244+
public Map<String, Integer> getCollectSlowNodesIpAddrFrequencyMap() {
2245+
return collectSlowNodesIpAddrFrequencyMap;
2246+
}
2247+
22252248
/**
22262249
* Use only for testing.
22272250
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4920,6 +4920,14 @@ public float getReconstructionQueuesInitProgress() {
49204920
return blockManager.getReconstructionQueuesInitProgress();
49214921
}
49224922

4923+
@Override // FSNamesystemMBean
4924+
@Metric
4925+
public String getCollectSlowNodesIpAddrFrequencyMap() {
4926+
Map<String, Integer> recordSlowNodesIpAddr =
4927+
getBlockManager().getDatanodeManager().getCollectSlowNodesIpAddrFrequencyMap();
4928+
return JSON.toString(recordSlowNodesIpAddr);
4929+
}
4930+
49234931
/**
49244932
* Returns the length of the wait Queue for the FSNameSystemLock.
49254933
*

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,13 @@ public interface FSNamesystemMBean {
268268
* @return Returns values between 0 and 1 for the progress.
269269
*/
270270
float getReconstructionQueuesInitProgress();
271+
272+
273+
/**
274+
* Returns a nested JSON object listing the collect slowNodesIpAddr frequency Map,
275+
* e.g. {"1.1.1.1":4,"2.2.2.2":3}
276+
*
277+
* @return JSON string.
278+
*/
279+
String getCollectSlowNodesIpAddrFrequencyMap();
271280
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,31 @@
2121
import org.apache.hadoop.hdfs.DFSConfigKeys;
2222
import org.apache.hadoop.hdfs.DFSTestUtil;
2323
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
24+
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
2425
import org.apache.hadoop.hdfs.server.namenode.NameNode;
2526
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
2627

2728
import org.apache.hadoop.test.GenericTestUtils;
29+
import org.eclipse.jetty.util.ajax.JSON;
2830
import org.junit.Assert;
2931
import org.junit.Test;
3032
import org.junit.runner.RunWith;
3133
import org.junit.runners.Parameterized;
3234

35+
import java.lang.management.ManagementFactory;
3336
import java.util.ArrayList;
3437
import java.util.Arrays;
38+
import java.util.HashMap;
39+
import java.util.Map;
3540
import java.util.Set;
3641

42+
import javax.management.MBeanServer;
43+
import javax.management.ObjectName;
44+
3745
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
3846
import static org.junit.Assert.assertEquals;
3947
import static org.junit.Assert.assertFalse;
48+
import static org.junit.Assert.assertNotEquals;
4049
import static org.junit.Assert.assertTrue;
4150

4251
@RunWith(Parameterized.class)
@@ -176,4 +185,58 @@ public void testSlowPeerTrackerEnabledClearSlowNodes() throws Exception {
176185
}
177186
}
178187

188+
/**
189+
* Dependent on the SlowNode related config, therefore placing
190+
* 'testCollectSlowNodesIpAddrFrequencyMetrics' unit test in the
191+
* TestReplicationPolicyExcludeSlowNodes class.
192+
* <br>
193+
* Test metrics associated with CollectSlowNodesIpAddrFrequency.
194+
*/
195+
@Test
196+
public void testCollectSlowNodesIpAddrFrequencyMetrics() throws Exception {
197+
namenode.getNamesystem().writeLock();
198+
try {
199+
FSNamesystem fsNamesystem = namenode.getNamesystem();
200+
assertEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrFrequencyMap(), "{}");
201+
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
202+
ObjectName mxBeanName = new ObjectName("Hadoop:service=NameNode,name=FSNamesystemState");
203+
String ipAddrFrequency =
204+
(String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap");
205+
assertEquals("{}", ipAddrFrequency, "{}");
206+
207+
// add nodes
208+
for (DatanodeDescriptor dataNode : dataNodes) {
209+
dnManager.addDatanode(dataNode);
210+
}
211+
212+
// mock slow nodes
213+
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
214+
Assert.assertNotNull(tracker);
215+
OutlierMetrics outlierMetrics = new OutlierMetrics(0.0, 0.0, 0.0, 5.0);
216+
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[2].getInfoAddr(), outlierMetrics);
217+
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[2].getInfoAddr(), outlierMetrics);
218+
219+
// waiting for slow nodes collector run and collect at least 2 times
220+
Thread.sleep(3000);
221+
assertNotEquals("{}", fsNamesystem.getCollectSlowNodesIpAddrFrequencyMap(), "{}");
222+
223+
// check jmx data
224+
ipAddrFrequency =
225+
(String) mBeanServer.getAttribute(mxBeanName, "CollectSlowNodesIpAddrFrequencyMap");
226+
assertNotEquals("{}", ipAddrFrequency, "{}");
227+
Map<String, Long> ipAddrFrequencyMap = (HashMap) JSON.parse(ipAddrFrequency);
228+
for (Map.Entry<String, Long> entry : ipAddrFrequencyMap.entrySet()) {
229+
assertTrue(dataNodes[0].getIpAddr().equals(entry.getKey())
230+
|| dataNodes[1].getIpAddr().equals(entry.getKey()));
231+
assertTrue(entry.getValue() > 1);
232+
}
233+
234+
// check reconfig
235+
namenode.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
236+
assertEquals("{}", dnManager.getCollectSlowNodesIpAddrFrequencyMap().toString());
237+
} finally {
238+
namenode.getNamesystem().writeUnlock();
239+
}
240+
}
241+
179242
}

0 commit comments

Comments
 (0)