From 5d9434bbb2fa650e987bc2e68d183aea691f9ac5 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 22 Mar 2017 19:59:38 -0700 Subject: [PATCH 1/4] [SPARK-18364][YARN] Expose metrics for YarnShuffleService Registers the shuffle server's metrics with the Hadoop Node Manager's DefaultMetricsSystem. Test metric collector gets right converted calls camel-case shuffleService Pass scalastyle Reformat and organize imports With import order specified at http://spark.apache.org/contributing.html --- .../shuffle/ExternalShuffleBlockHandler.java | 5 +- .../network/yarn/YarnShuffleService.java | 39 ++++-- .../yarn/YarnShuffleServiceMetrics.java | 123 ++++++++++++++++++ .../yarn/YarnShuffleServiceMetricsSuite.scala | 75 +++++++++++ 4 files changed, 231 insertions(+), 11 deletions(-) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 098fa7974b87b..1457ff613b60c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -173,7 +173,8 @@ private void checkAuth(TransportClient client, String appId) { /** * A simple class to wrap all shuffle service wrapper metrics */ - private class ShuffleMetrics implements MetricSet { + @VisibleForTesting + public class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -182,7 +183,7 @@ private class ShuffleMetrics implements MetricSet { // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); - private ShuffleMetrics() { + public ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d8b2ed6b5dc7b..15c850725d4da 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,8 +19,10 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -35,12 +37,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; -import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,18 +53,19 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; /** * An external shuffle service used by Spark on Yarn. - * + *

* This is intended to be a long-running auxiliary service that runs in the NodeManager process. * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. * The application also automatically derives the service port through `spark.shuffle.service.port` * specified in the Yarn configuration. This is so that both the clients and the server agree on * the same port to communicate on. - * + *

* The service also optionally supports authentication. This ensures that executors from one * application cannot read the shuffle files written by those from another. This feature can be * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. @@ -96,7 +100,7 @@ public class YarnShuffleService extends AuxiliaryService { private static final ObjectMapper mapper = new ObjectMapper(); private static final String APP_CREDS_KEY_PREFIX = "AppCreds"; private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider - .StoreVersion(1, 0); + .StoreVersion(1, 0); // just for integration tests that want to look at this file -- in general not sensible as // a static @@ -168,6 +172,23 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + try { + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics( + blockHandler.getAllMetrics()); + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); @@ -189,7 +210,7 @@ protected void serviceInit(Configuration conf) throws Exception { boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); } catch (Exception e) { if (stopOnFailure) { @@ -409,8 +430,8 @@ public int hashCode() { @Override public String toString() { return Objects.toStringHelper(this) - .add("appId", appId) - .toString(); + .add("appId", appId) + .toString(); } } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..86cb07ae711ac --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.*; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } + + @VisibleForTesting + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + Gauge m = (Gauge) metric; + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000000..183545c94f329 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.yarn + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + // only one + verify(builder).addGauge(anyObject(), anyInt()) + } +} From 6c96397536af57a8bbe8dd2529547427f643512b Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Wed, 19 Sep 2018 17:17:53 +0200 Subject: [PATCH 2/4] [SPARK-18364][YARN] YarnShuffleService metrics correction --- .../shuffle/ExternalShuffleBlockHandler.java | 5 +-- .../network/yarn/YarnShuffleService.java | 42 +++++++------------ .../yarn/YarnShuffleServiceMetrics.java | 38 ++++++++++------- 3 files changed, 42 insertions(+), 43 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 1457ff613b60c..098fa7974b87b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -173,8 +173,7 @@ private void checkAuth(TransportClient client, String appId) { /** * A simple class to wrap all shuffle service wrapper metrics */ - @VisibleForTesting - public class ShuffleMetrics implements MetricSet { + private class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -183,7 +182,7 @@ public class ShuffleMetrics implements MetricSet { // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); - public ShuffleMetrics() { + private ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 15c850725d4da..392bdcb19ace7 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,10 +19,8 @@ import java.io.File; import java.io.IOException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -37,13 +35,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; +import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,19 +52,18 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; -import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; /** * An external shuffle service used by Spark on Yarn. - *

+ * * This is intended to be a long-running auxiliary service that runs in the NodeManager process. * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. * The application also automatically derives the service port through `spark.shuffle.service.port` * specified in the Yarn configuration. This is so that both the clients and the server agree on * the same port to communicate on. - *

+ * * The service also optionally supports authentication. This ensures that executors from one * application cannot read the shuffle files written by those from another. This feature can be * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. @@ -100,7 +98,7 @@ public class YarnShuffleService extends AuxiliaryService { private static final ObjectMapper mapper = new ObjectMapper(); private static final String APP_CREDS_KEY_PREFIX = "AppCreds"; private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider - .StoreVersion(1, 0); + .StoreVersion(1, 0); // just for integration tests that want to look at this file -- in general not sensible as // a static @@ -173,21 +171,13 @@ protected void serviceInit(Configuration conf) throws Exception { blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); // register metrics on the block handler into the Node Manager's metrics system. - try { - YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics( - blockHandler.getAllMetrics()); - MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); - - Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", - String.class, String.class, MetricsSource.class); - registerSourceMethod.setAccessible(true); - registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + - "Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); - } catch (Exception e) { - logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + - "proceeding without metrics", e); - } + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + metricsSystem.register( + "shuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests @@ -210,7 +200,7 @@ protected void serviceInit(Configuration conf) throws Exception { boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); } catch (Exception e) { if (stopOnFailure) { @@ -430,8 +420,8 @@ public int hashCode() { @Override public String toString() { return Objects.toStringHelper(this) - .add("appId", appId) - .toString(); + .add("appId", appId) + .toString(); } } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 86cb07ae711ac..f95217713a813 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -18,22 +18,23 @@ package org.apache.spark.network.yarn; import com.codahale.metrics.*; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; import java.util.Map; /** - * Modeled off of YARN's NodeManagerMetrics. + * Forward {@link ExternalShuffleBlockHandler.ShuffleMetrics} to hadoop metrics system. + * NodeManager by default exposes JMX endpoint where can be collected. */ -public class YarnShuffleServiceMetrics implements MetricsSource { +class YarnShuffleServiceMetrics implements MetricsSource { private final MetricSet metricSet; - public YarnShuffleServiceMetrics(MetricSet metricSet) { + YarnShuffleServiceMetrics(MetricSet metricSet) { this.metricSet = metricSet; } @@ -52,10 +53,10 @@ public void getMetrics(MetricsCollector collector, boolean all) { } } - @VisibleForTesting - public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { - - // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + /** + * The metric types used in {@link ExternalShuffleBlockHandler.ShuffleMetrics} + */ + private static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { if (metric instanceof Timer) { Timer t = (Timer) metric; metricsRecordBuilder @@ -89,17 +90,26 @@ public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, Stri .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), m.getMeanRate()); } else if (metric instanceof Gauge) { - Gauge m = (Gauge) metric; - Object gaugeValue = m.getValue(); + final Object gaugeValue = ((Gauge) metric).getValue(); if (gaugeValue instanceof Integer) { - Integer intValue = (Integer) gaugeValue; - metricsRecordBuilder - .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + - "gauge " + name), intValue.intValue()); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue); + } else if (gaugeValue instanceof Long) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue); + } else if (gaugeValue instanceof Float) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue); + } else if (gaugeValue instanceof Double) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue); + } else { + throw new IllegalStateException( + "Not supported class type of metric[" + name + "] for value " + gaugeValue); } } } + private static MetricsInfo getShuffleServiceMetricsInfo(String name) { + return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name); + } + private static class ShuffleServiceMetricsInfo implements MetricsInfo { private final String name; From 764404a89eacc58898a3fff2e49b00ac2c1a01b3 Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Mon, 24 Sep 2018 12:11:12 +0200 Subject: [PATCH 3/4] [SPARK-18364][YARN] YarnShuffleService metrics tests fix --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 4 ++-- .../apache/spark/network/yarn/YarnShuffleServiceMetrics.java | 2 +- .../spark/network/yarn/YarnShuffleServiceMetricsSuite.scala | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 392bdcb19ace7..baa0af0a82569 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -172,11 +172,11 @@ protected void serviceInit(Configuration conf) throws Exception { // register metrics on the block handler into the Node Manager's metrics system. YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); metricsSystem.register( - "shuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + "shuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); // If authentication is enabled, set up the shuffle server to use a diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index f95217713a813..022b3f0d91cb6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -56,7 +56,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { /** * The metric types used in {@link ExternalShuffleBlockHandler.ShuffleMetrics} */ - private static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { if (metric instanceof Timer) { Timer t = (Timer) metric; metricsRecordBuilder diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 183545c94f329..40b92282a3b8f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -63,8 +63,6 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { // this metric writes only one gauge to the collector test("registeredExecutorsSize - collector receives correct types") { val builder = mock(classOf[MetricsRecordBuilder]) - when(builder.addCounter(any(), anyLong())).thenReturn(builder) - when(builder.addGauge(any(), anyDouble())).thenReturn(builder) YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", metrics.getMetrics.get("registeredExecutorsSize")) From e259b6f899ab44e38309b34c58134772c2e591bf Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Tue, 25 Sep 2018 12:19:31 +0200 Subject: [PATCH 4/4] [SPARK-18364][YARN] YarnShuffleService code style corrections --- .../spark/network/yarn/YarnShuffleService.java | 2 +- .../yarn/YarnShuffleServiceMetrics.java | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index baa0af0a82569..72ae1a1295236 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -176,7 +176,7 @@ protected void serviceInit(Configuration conf) throws Exception { MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); metricsSystem.register( - "shuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); // If authentication is enabled, set up the shuffle server to use a diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 022b3f0d91cb6..3e4d479b862b3 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -17,17 +17,17 @@ package org.apache.spark.network.yarn; +import java.util.Map; + import com.codahale.metrics.*; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; - -import java.util.Map; /** - * Forward {@link ExternalShuffleBlockHandler.ShuffleMetrics} to hadoop metrics system. + * Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics} + * to hadoop metrics system. * NodeManager by default exposes JMX endpoint where can be collected. */ class YarnShuffleServiceMetrics implements MetricsSource { @@ -46,7 +46,7 @@ class YarnShuffleServiceMetrics implements MetricsSource { */ @Override public void getMetrics(MetricsCollector collector, boolean all) { - MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService"); for (Map.Entry entry : metricSet.getMetrics().entrySet()) { collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); @@ -54,9 +54,13 @@ public void getMetrics(MetricsCollector collector, boolean all) { } /** - * The metric types used in {@link ExternalShuffleBlockHandler.ShuffleMetrics} + * The metric types used in + * {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}. + * Visible for testing. */ - public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + public static void collectMetric( + MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + if (metric instanceof Timer) { Timer t = (Timer) metric; metricsRecordBuilder