From 31f548ccda0f71863fd931af8c1a2544f1a2ee35 Mon Sep 17 00:00:00 2001 From: Weiyi Kong Date: Tue, 27 Oct 2020 00:53:45 +0800 Subject: [PATCH 1/2] Add AppLiveStatusPlugin for live application --- .../scala/org/apache/spark/SparkContext.scala | 24 ++++++++++++- .../spark/internal/config/package.scala | 6 ++++ .../spark/status/AppLiveStatusPlugin.scala | 34 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/org/apache/spark/status/AppLiveStatusPlugin.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 501e865c4105..6047ebe65477 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -61,7 +61,7 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.ShuffleDataIOUtils import org.apache.spark.shuffle.api.ShuffleDriverComponents -import org.apache.spark.status.{AppStatusSource, AppStatusStore} +import org.apache.spark.status.{AppLiveStatusPlugin, AppStatusSource, AppStatusStore, ElementTrackingStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -617,6 +617,7 @@ class SparkContext(config: SparkConf) extends Logging { } _executorAllocationManager.foreach(_.start()) + loadLiveStatusPlugins() setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() @@ -2441,6 +2442,27 @@ class SparkContext(config: SparkConf) extends Logging { /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + private def loadLiveStatusPlugins(): Unit = { + try { + conf.get(APP_LIVE_STATUS_PLUGINS).foreach { classNames => + val plugins = Utils.loadExtensions(classOf[AppLiveStatusPlugin], classNames, conf) + plugins.foreach { plugin => + val pluginName = plugin.getClass.getName + try { + plugin.createListeners(conf, _statusStore.store.asInstanceOf[ElementTrackingStore]) + .foreach(listenerBus.addToSharedQueue(_)) + } catch { + case e: Exception => + logWarning(s"Exception when registering live app status plugin ${pluginName}", e) + } + } + } + } catch { + case e: Exception => + logWarning(s"Exception when registering live app status plugins", e) + } + } + /** * Registers listeners specified in spark.extraListeners, then starts the listener bus. * This should be called after all internal listeners have been registered with the listener bus diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9a7039a9cfe9..6c13dde1463b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1221,6 +1221,12 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val APP_LIVE_STATUS_PLUGINS = ConfigBuilder("spark.appLiveStatusPlugins") + .doc("Class names of AppLiveStatusPlugin to add to SparkContext during initialization") + .stringConf + .toSequence + .createOptional + private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners") .doc("Class names of listeners to add to SparkContext during initialization.") .version("1.3.0") diff --git a/core/src/main/scala/org/apache/spark/status/AppLiveStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppLiveStatusPlugin.scala new file mode 100644 index 000000000000..07299510fb71 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppLiveStatusPlugin.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener + +/** + * An interface for creating live app listeners defined in other modules. + */ +private[spark] trait AppLiveStatusPlugin { + /** + * Creates listeners to collect data about the running application and populate the given store. + * + * @param conf The Spark configuration. + * @param store The store where to keep application data. + */ + def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] +} From 8ee60f4917d8f96f4aefedfb37fedec29ba1435e Mon Sep 17 00:00:00 2001 From: Weiyi Kong Date: Tue, 27 Oct 2020 11:46:13 +0800 Subject: [PATCH 2/2] Add UT for AppLiveStatusPlugin --- .../status/AppLiveStatusPluginSuite.scala | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/status/AppLiveStatusPluginSuite.scala diff --git a/core/src/test/scala/org/apache/spark/status/AppLiveStatusPluginSuite.scala b/core/src/test/scala/org/apache/spark/status/AppLiveStatusPluginSuite.scala new file mode 100644 index 000000000000..1a7cea25030e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/AppLiveStatusPluginSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import scala.collection.JavaConverters._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config.APP_LIVE_STATUS_PLUGINS +import org.apache.spark.scheduler.SparkListener + +class AppLiveStatusPluginSuite extends SparkFunSuite with LocalSparkContext { + test("SPARK-33249: Should be able to add listeners") { + val plugins = Seq( + classOf[SomePluginA], + classOf[SomePluginB] + ) + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(APP_LIVE_STATUS_PLUGINS, plugins.map(_.getName)) + sc = new SparkContext(conf) + + assert(sc.listenerBus.listeners.asScala.count(_.isInstanceOf[SomeListenerA]) == 1) + assert(sc.listenerBus.listeners.asScala.count(_.isInstanceOf[SomeListenerB]) == 1) + assert(sc.listenerBus.listeners.asScala.count(_.isInstanceOf[SomeListenerC]) == 1) + } + + test("SPARK-33249: Only load plugins in conf") { + val plugins = Seq( + classOf[SomePluginB], + classOf[SomePluginC] + ) + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(APP_LIVE_STATUS_PLUGINS, plugins.map(_.getName)) + sc = new SparkContext(conf) + + assert(sc.listenerBus.listeners.asScala.count(_.isInstanceOf[SomeListenerC]) == 1) + assert(sc.listenerBus.listeners.asScala.count(_.isInstanceOf[SomeListenerD]) == 0) + } +} + +private class SomePluginA extends AppLiveStatusPlugin { + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SomeListenerA(), new SomeListenerB()) + } +} + +private class SomePluginB extends AppLiveStatusPlugin { + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SomeListenerC()) + } +} + +private class SomePluginC extends AppLiveStatusPlugin { + throw new UnsupportedOperationException("do not init") + + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SomeListenerD()) + } +} + +private class SomeListenerA extends SparkListener +private class SomeListenerB extends SparkListener +private class SomeListenerC extends SparkListener +private class SomeListenerD extends SparkListener