diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 271d1fead5546..a30009f233d3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -55,6 +55,8 @@ public final class RouterMetrics { private MutableGaugeInt numAppAttemptsFailedRetrieved; @Metric("# of getClusterMetrics failed to be retrieved") private MutableGaugeInt numGetClusterMetricsFailedRetrieved; + @Metric("# of getClusterNodes failed to be retrieved") + private MutableGaugeInt numGetClusterNodesFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -74,7 +76,8 @@ public final class RouterMetrics { @Metric("Total number of successful Retrieved getClusterMetrics and " + "latency(ms)") private MutableRate totalSucceededGetClusterMetricsRetrieved; - + @Metric("Total number of successful Retrieved getClusterNodes and latency(ms)") + private MutableRate totalSucceededGetClusterNodesRetrieved; /** * Provide quantile counters for all latencies. @@ -86,6 +89,7 @@ public final class RouterMetrics { private MutableQuantiles getApplicationsReportLatency; private MutableQuantiles getApplicationAttemptReportLatency; private MutableQuantiles getClusterMetricsLatency; + private MutableQuantiles getClusterNodesLatency; private static volatile RouterMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -112,6 +116,10 @@ private RouterMetrics() { getClusterMetricsLatency = registry.newQuantiles("getClusterMetricsLatency", "latency of get cluster metrics", "ops", "latency", 10); + + getClusterNodesLatency = + registry.newQuantiles("getClusterNodesLatency", + "latency of get cluster nodes", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -168,6 +176,11 @@ public long getNumSucceededGetClusterMetricsRetrieved(){ return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededGetClusterNodesRetrieved(){ + return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -203,6 +216,11 @@ public double getLatencySucceededGetClusterMetricsRetrieved() { return totalSucceededGetClusterMetricsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetClusterNodesRetrieved() { + return totalSucceededGetClusterNodesRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -238,6 +256,11 @@ public int getClusterMetricsFailedRetrieved() { return numGetClusterMetricsFailedRetrieved.value(); } + @VisibleForTesting + public int getClusterNodesFailedRetrieved() { + return numGetClusterNodesFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -273,6 +296,11 @@ public void succeededGetClusterMetricsRetrieved(long duration) { getClusterMetricsLatency.add(duration); } + public void succeededGetClusterNodesRetrieved(long duration) { + totalSucceededGetClusterNodesRetrieved.add(duration); + getClusterNodesLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -301,4 +329,7 @@ public void incrGetClusterMetricsFailedRetrieved() { numGetClusterMetricsFailedRetrieved.incr(); } + public void incrClusterNodesFailedRetrieved() { + numGetClusterNodesFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index de80a35f49b78..6b6b19aac85ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.router.clientrm; +import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.lang.reflect.Method; @@ -791,7 +792,30 @@ Map invokeConcurrent(Collection clusterIds, @Override public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null) { + routerMetrics.incrClusterNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null); + } + long startTime = clock.getTime(); + Map subClusters = + federationFacade.getSubClusters(true); + Map clusterNodes = Maps.newHashMap(); + for (SubClusterId subClusterId : subClusters.keySet()) { + ApplicationClientProtocol client; + try { + client = getClientRMProxyForSubCluster(subClusterId); + GetClusterNodesResponse response = client.getClusterNodes(request); + clusterNodes.put(subClusterId, response); + } catch (Exception ex) { + routerMetrics.incrClusterNodesFailedRetrieved(); + LOG.error("Unable to get cluster nodes due to exception.", ex); + throw ex; + } + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime); + // Merge the NodesResponse + return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index 934636b104df6..dfe20cfda1f3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -20,14 +20,19 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.List; +import java.util.ArrayList; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -194,4 +199,23 @@ private static boolean mergeUamToReport(String appName, return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) || appName.startsWith(PARTIAL_REPORT)); } + + /** + * Merges a list of GetClusterNodesResponse. + * + * @param responses a list of GetClusterNodesResponse to merge. + * @return the merged GetClusterNodesResponse. + */ + public static GetClusterNodesResponse mergeClusterNodesResponse( + Collection responses) { + GetClusterNodesResponse clusterNodesResponse = Records.newRecord(GetClusterNodesResponse.class); + List nodeReports = new ArrayList<>(); + for (GetClusterNodesResponse response : responses) { + if (response != null && response.getNodeReports() != null) { + nodeReports.addAll(response.getNodeReports()); + } + } + clusterNodesResponse.setNodeReports(nodeReports); + return clusterNodesResponse; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 4d4838a560b48..df8c194208aa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -38,6 +38,8 @@ public class TestRouterMetrics { private static RouterMetrics metrics = RouterMetrics.getMetrics(); + private static final Double ASSERT_DOUBLE_DELTA = 0.01; + @BeforeClass public static void init() { @@ -346,6 +348,11 @@ public void getClusterMetrics() { LOG.info("Mocked: failed getClusterMetrics call"); metrics.incrGetClusterMetricsFailedRetrieved(); } + + public void getClusterNodes() { + LOG.info("Mocked: failed getClusterNodes call"); + metrics.incrClusterNodesFailedRetrieved(); + } } // Records successes for all calls @@ -392,5 +399,30 @@ public void getClusterMetrics(long duration){ duration); metrics.succeededGetClusterMetricsRetrieved(duration); } + + public void getClusterNodes(long duration) { + LOG.info("Mocked: successful getClusterNodes call with duration {}", duration); + metrics.succeededGetClusterNodesRetrieved(duration); + } + } + + @Test + public void testSucceededGetClusterNodes() { + long totalGoodBefore = metrics.getNumSucceededGetClusterNodesRetrieved(); + goodSubCluster.getClusterNodes(150); + Assert.assertEquals(totalGoodBefore + 1, metrics.getNumSucceededGetClusterNodesRetrieved()); + Assert.assertEquals(150, metrics.getLatencySucceededGetClusterNodesRetrieved(), + ASSERT_DOUBLE_DELTA); + goodSubCluster.getClusterNodes(300); + Assert.assertEquals(totalGoodBefore + 2, metrics.getNumSucceededGetClusterNodesRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededGetClusterNodesRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetClusterNodesFailed() { + long totalBadBefore = metrics.getClusterNodesFailedRetrieved(); + badSubCluster.getClusterNodes(); + Assert.assertEquals(totalBadBefore + 1, metrics.getClusterNodesFailedRetrieved()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 74d10a44c52b5..7409a1cff169f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -641,4 +643,16 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception{ Assert.assertNotNull(responseGet); Assert.assertTrue(responseGet.getApplicationList().isEmpty()); } + + @Test + public void testGetClusterNodesRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request"); + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.", + () -> interceptor.getClusterNodes(null)); + // normal request. + GetClusterNodesResponse response = + interceptor.getClusterNodes(GetClusterNodesRequest.newInstance()); + Assert.assertEquals(subClusters.size(), response.getNodeReports().size()); + } }