From cf74f36dbefd131e10d3fb0148b5f259077fff86 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 20 Sep 2018 10:04:24 -0500 Subject: [PATCH 1/5] [SPARK-18364] : Expose metrics for YarnShuffleService Added shuffle server metrics for Spark Yarn shuffle service. I have made my changes on top of Andrew Ash's PR and have additionally added two more metrics on top of them: numRegisteredConnections which indicate the number of registered connections to the shuffle service and numActiveConnections which indicate the number of active connections to the shuffle service at any given point in time. If these metrics are outputted to a file, we get something like this: 1533674653489 default.shuffleService: Hostname=openqe26blue-n9.blue.ygrid.yahoo.com, openBlockRequestLatencyMillis_count=729, openBlockRequestLatencyMillis_rate15=0.7110833548897356, openBlockRequestLatencyMillis_rate5=1.657808981793011, openBlockRequestLatencyMillis_rate1=2.2404486061620474, openBlockRequestLatencyMillis_rateMean=0.9242558551196706, numRegisteredConnections=35, blockTransferRateBytes_count=2635880512, blockTransferRateBytes_rate15=2578547.6094160094, blockTransferRateBytes_rate5=6048721.726302424, blockTransferRateBytes_rate1=8548922.518223226, blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, registerExecutorRequestLatencyMillis_count=5, registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, registerExecutorRequestLatencyMillis_rate1=2.8270296777387467E-6, registerExecutorRequestLatencyMillis_rateMean=0.006339206380043053, numActiveConnections=35 --- .../spark/network/TransportContext.java | 9 +- .../server/TransportChannelHandler.java | 16 ++- .../spark/network/server/TransportServer.java | 5 + .../shuffle/ExternalShuffleBlockHandler.java | 22 ++- .../network/yarn/YarnShuffleService.java | 20 +++ .../yarn/YarnShuffleServiceMetrics.java | 132 ++++++++++++++++++ .../spark/deploy/ExternalShuffleService.scala | 2 + .../yarn/YarnShuffleServiceMetricsSuite.scala | 75 ++++++++++ 8 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index ae91bc9cfdd08..8bc3cf949be0f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import com.codahale.metrics.Counter; import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; @@ -61,6 +62,8 @@ public class TransportContext { private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); /** * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created @@ -170,8 +173,12 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred()); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), closeIdleConnections); + conf.connectionTimeoutMs(), closeIdleConnections, this); } public TransportConf getConf() { return conf; } + + public Counter getRegisteredConnections() { + return registeredConnections; + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index 56782a8327876..be4cfc32382f0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.spark.network.TransportContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,18 +56,21 @@ public class TransportChannelHandler extends ChannelInboundHandlerAdapter { private final TransportRequestHandler requestHandler; private final long requestTimeoutNs; private final boolean closeIdleConnections; + private final TransportContext transportContext; public TransportChannelHandler( TransportClient client, TransportResponseHandler responseHandler, TransportRequestHandler requestHandler, long requestTimeoutMs, - boolean closeIdleConnections) { + boolean closeIdleConnections, + TransportContext transportContext) { this.client = client; this.responseHandler = responseHandler; this.requestHandler = requestHandler; this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000; this.closeIdleConnections = closeIdleConnections; + this.transportContext = transportContext; } public TransportClient getClient() { @@ -161,4 +165,14 @@ public TransportResponseHandler getResponseHandler() { return responseHandler; } + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + transportContext.getRegisteredConnections().inc(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + transportContext.getRegisteredConnections().dec(); + } + } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index 9c85ab2f5f06f..eb5f10a5c1a1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Counter; import com.codahale.metrics.MetricSet; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -159,4 +160,8 @@ public void close() { } bootstrap = null; } + + public Counter getRegisteredConnections() { + return context.getRegisteredConnections(); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 098fa7974b87b..f183e5244e7e7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -29,6 +29,7 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Timer; +import com.codahale.metrics.Counter; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,7 +174,8 @@ private void checkAuth(TransportClient client, String appId) { /** * A simple class to wrap all shuffle service wrapper metrics */ - private class ShuffleMetrics implements MetricSet { + @VisibleForTesting + public class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -181,14 +183,20 @@ private class ShuffleMetrics implements MetricSet { private final Timer registerExecutorRequestLatencyMillis = new Timer(); // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); + // Number of active connections to the shuffle service + private Counter activeConnections = new Counter(); + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); - private ShuffleMetrics() { + public ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); allMetrics.put("registeredExecutorsSize", (Gauge) () -> blockManager.getRegisteredExecutorsSize()); + allMetrics.put("numActiveConnections", activeConnections); + allMetrics.put("numRegisteredConnections", registeredConnections); } @Override @@ -244,4 +252,14 @@ public ManagedBuffer next() { } } + @Override + public void channelActive(TransportClient client) { + metrics.activeConnections.inc(); + } + + @Override + public void channelInactive(TransportClient client) { + metrics.activeConnections.dec(); + } + } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d8b2ed6b5dc7b..918c9e4b735c0 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; import java.util.List; @@ -35,6 +36,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; import org.apache.spark.network.util.LevelDBProvider; @@ -191,6 +195,22 @@ protected void serviceInit(Configuration conf) throws Exception { logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); + try { + blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", + shuffleServer.getRegisteredConnections()); + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } } catch (Exception e) { if (stopOnFailure) { throw e; diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..83fb45fc700b0 --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.*; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } + + @VisibleForTesting + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, + Metric metric) { + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + Gauge m = (Gauge) metric; + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } + } else if (metric instanceof Counter) { + Counter c = (Counter) metric; + long counterValue = c.getCount(); + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + + "connections to shuffle service " + name), counterValue); + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index f6b3c37f0fe72..03e3abb3ce569 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana server = transportContext.createServer(port, bootstraps.asJava) shuffleServiceSource.registerMetricSet(server.getAllMetrics) + blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections", + server.getRegisteredConnections) shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics) masterMetricsSystem.registerSource(shuffleServiceSource) masterMetricsSystem.start() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000000..17afa3c53c56c --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.metrics2.MetricsRecordBuilder + +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} + +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", "numRegisteredConnections") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + verify(builder).addGauge(anyObject(), anyInt()) + } +} \ No newline at end of file From 1ac18d9a8d8269eb73bac1b09e45215a20fa53c5 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 20 Sep 2018 16:11:01 -0500 Subject: [PATCH 2/5] [SPARK-18364] : Fixing Scalastyle Tests --- .../network/yarn/YarnShuffleServiceMetricsSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 17afa3c53c56c..0410b200a3506 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -20,10 +20,8 @@ package org.apache.spark.network.yarn import scala.collection.JavaConverters._ import org.apache.hadoop.metrics2.MetricsRecordBuilder - import org.mockito.Matchers._ import org.mockito.Mockito.{mock, times, verify, when} - import org.scalatest.Matchers import org.apache.spark.SparkFunSuite @@ -41,7 +39,8 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { test("metrics named as expected") { val allMetrics = Set( "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", - "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", "numRegisteredConnections") + "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", + "numRegisteredConnections") metrics.getMetrics.keySet().asScala should be (allMetrics) } @@ -72,4 +71,4 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { verify(builder).addGauge(anyObject(), anyInt()) } -} \ No newline at end of file +} From 70472a255e5da3ea4522959e26f5c403641e1ce6 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 5 Oct 2018 16:11:43 -0500 Subject: [PATCH 3/5] [SPARK-18364] : Fixing Code to pass Unit tests --- .../network/yarn/YarnShuffleService.java | 23 ++++++++++--------- .../yarn/YarnShuffleServiceMetrics.java | 1 - 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 1a56f565d7780..b73d2378fae1f 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -171,17 +171,6 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - // register metrics on the block handler into the Node Manager's metrics system. - blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", - shuffleServer.getRegisteredConnections()); - YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); - - MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); - metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); - // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); @@ -202,6 +191,18 @@ protected void serviceInit(Configuration conf) throws Exception { port = shuffleServer.getPort(); boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; + + // register metrics on the block handler into the Node Manager's metrics system. + blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", + shuffleServer.getRegisteredConnections()); + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + metricsSystem.register( + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index ee35c8da5cbc2..f73d3422f1bd8 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -113,7 +113,6 @@ public static void collectMetric( metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + "connections to shuffle service " + name), counterValue); } - } private static MetricsInfo getShuffleServiceMetricsInfo(String name) { From 3c5ef99dc7271a8862df825460c301fc07f5db7d Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 20 Dec 2018 11:06:39 -0600 Subject: [PATCH 4/5] [SPARK-18364] : Addressing Reviews December 20, 2018 Calling superclass methods from overrided methods and fixing indentation. --- .../spark/network/server/TransportChannelHandler.java | 6 ++++-- .../spark/network/shuffle/ExternalShuffleBlockHandler.java | 2 ++ .../org/apache/spark/network/yarn/YarnShuffleService.java | 6 +++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index f2efb92783204..ca81099c4d5cb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -181,13 +181,15 @@ public TransportResponseHandler getResponseHandler() { } @Override - public void channelRegistered(ChannelHandlerContext ctx) { + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { transportContext.getRegisteredConnections().inc(); + super.channelRegistered(ctx); } @Override - public void channelUnregistered(ChannelHandlerContext ctx) { + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { transportContext.getRegisteredConnections().dec(); + super.channelUnregistered(ctx); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index f183e5244e7e7..788a845c57755 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -255,11 +255,13 @@ public ManagedBuffer next() { @Override public void channelActive(TransportClient client) { metrics.activeConnections.inc(); + super.channelActive(client); } @Override public void channelInactive(TransportClient client) { metrics.activeConnections.dec(); + super.channelInactive(client); } } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index b73d2378fae1f..55c028c224a4b 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -194,13 +194,13 @@ protected void serviceInit(Configuration conf) throws Exception { // register metrics on the block handler into the Node Manager's metrics system. blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", - shuffleServer.getRegisteredConnections()); + shuffleServer.getRegisteredConnections()); YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); logger.info("Started YARN shuffle service for Spark on port {}. " + From e2414ca2789f4e26969b262306a7c04f43a5f0c8 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 21 Dec 2018 07:59:53 -0600 Subject: [PATCH 5/5] [SPARK-25642] : Fixing indentation --- .../apache/spark/network/yarn/YarnShuffleServiceMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index f73d3422f1bd8..501237407e9b2 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -111,7 +111,7 @@ public static void collectMetric( Counter c = (Counter) metric; long counterValue = c.getCount(); metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + - "connections to shuffle service " + name), counterValue); + "connections to shuffle service " + name), counterValue); } }