From 7832d40572cf0303e07a9a686b62a675a5007295 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 21 Jul 2021 14:30:44 +0800 Subject: [PATCH 01/17] [SPARK-36237][SQL] We should attach and start handler after application started --- .../src/main/scala/org/apache/spark/SparkContext.scala | 7 +++++-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 10 +++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d11fa554ca8c..f9f50e3b8837 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -596,8 +596,6 @@ class SparkContext(config: SparkConf) extends Logging { // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start(_conf.get(METRICS_STATIC_SOURCES_ENABLED)) - // Attach the driver metrics servlet handler to the web ui after the metrics system is started. - _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) _eventLogger = if (isEventLogEnabled) { @@ -639,6 +637,11 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() postApplicationStart() + // After application started, attach handlers to started server and start handler. + _ui.foreach(_.attachAllHandler()) + // Attach the driver metrics servlet handler to the web ui after the metrics system is started. + _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) + // Post init _taskScheduler.postStartHook() if (isLocal) { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index d826686382f1..95408212d6c7 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -136,6 +136,15 @@ private[spark] abstract class WebUI( attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) } + /** + * Attach all existed handler to ServerInfo. + */ + def attachAllHandler(): Unit = { + serverInfo.foreach { server => + handlers.foreach(server.addHandler(_, securityManager)) + } + } + /** A hook to initialize components of the UI */ def initialize(): Unit @@ -145,7 +154,6 @@ private[spark] abstract class WebUI( try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) - handlers.foreach(server.addHandler(_, securityManager)) serverInfo = Some(server) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { From 4fb422ca81ca7bb70e2644f93d584d7d4bc2979f Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 21 Jul 2021 18:03:13 +0800 Subject: [PATCH 02/17] update --- .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 1 + core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 + core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + .../org/apache/spark/deploy/history/HistoryServerSuite.scala | 2 ++ .../spark/deploy/history/RealBrowserUIHistoryServerSuite.scala | 1 + .../org/apache/spark/deploy/master/ui/MasterWebUISuite.scala | 1 + 6 files changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index bea3f9ec84a4..478f55c44181 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -308,6 +308,7 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + server.attachAllHandler() provider.start() ShutdownHookManager.addShutdownHook { () => server.stop() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c964e343ca6c..4438d9965bf8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -143,6 +143,7 @@ private[deploy] class Master( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() + webUi.attachAllHandler() masterWebUiUrl = webUi.webUrl if (reverseProxy) { val uiReverseProxyUrl = conf.get(UI_REVERSE_PROXY_URL).map(_.stripSuffix("/")) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 05e8e5a6b676..90bd63499691 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -234,6 +234,7 @@ private[deploy] class Worker( setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + webUi.attachAllHandler() workerWebUiUrl = s"${webUi.scheme}$publicAddress:${webUi.boundPort}" registerWithMaster() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 734ad7b6e3bd..8d68191aa12b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -93,6 +93,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(conf, provider, securityManager, 18080) server.initialize() server.bind() + server.attachAllHandler() provider.start() port = server.boundPort } @@ -412,6 +413,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() + server.attachAllHandler() provider.start() val port = server.boundPort val metrics = server.cacheMetrics diff --git a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala index 4a5c34f86753..892b272f3719 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala @@ -89,6 +89,7 @@ abstract class RealBrowserUIHistoryServerSuite(val driverProp: String) server = new HistoryServer(conf, provider, securityManager, 18080) server.initialize() server.bind() + server.attachAllHandler() provider.start() port = server.boundPort } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index be83ec12f92f..ad919adf8c19 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -50,6 +50,7 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { super.beforeAll() masterWebUI.bind() + masterWebUI.attachAllHandler() } override def afterAll(): Unit = { From dba26cd5bd1aaacb01e08cfcfef9f02ffe96d018 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 27 Jul 2021 16:43:01 +0800 Subject: [PATCH 03/17] Update WebUI.scala --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 95408212d6c7..f75d73b4dd7c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui import java.util.EnumSet import javax.servlet.DispatcherType -import javax.servlet.http.{HttpServlet, HttpServletRequest} +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -51,6 +51,14 @@ private[spark] abstract class WebUI( extends Logging { protected val tabs = ArrayBuffer[WebUITab]() + protected val initHandler: ServletContextHandler = { + val servlet = new HttpServlet() { + override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + res.getWriter.write("Spark application is starting, please wait for start up.") + } + } + createServletHandler("/", servlet, basePath) + } protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None @@ -141,6 +149,7 @@ private[spark] abstract class WebUI( */ def attachAllHandler(): Unit = { serverInfo.foreach { server => + server.removeHandler(initHandler) handlers.foreach(server.addHandler(_, securityManager)) } } @@ -154,6 +163,7 @@ private[spark] abstract class WebUI( try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) + server.addHandler(initHandler, securityManager) serverInfo = Some(server) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { From 694a8fd105709fc8e9c95ac12933556faf2f4fac Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 27 Jul 2021 20:04:57 +0800 Subject: [PATCH 04/17] Update WebUI.scala --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index f75d73b4dd7c..82a2eb7900c9 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -54,6 +54,7 @@ private[spark] abstract class WebUI( protected val initHandler: ServletContextHandler = { val servlet = new HttpServlet() { override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + res.setContentType("text/html;charset=utf-8") res.getWriter.write("Spark application is starting, please wait for start up.") } } From 340d8d0a48c2b70f76d05f3f095bb187ce3b6df6 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 27 Jul 2021 20:11:55 +0800 Subject: [PATCH 05/17] Update WebUI.scala --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 82a2eb7900c9..cf16889842a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -55,7 +55,7 @@ private[spark] abstract class WebUI( val servlet = new HttpServlet() { override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { res.setContentType("text/html;charset=utf-8") - res.getWriter.write("Spark application is starting, please wait for start up.") + res.getWriter.write("Spark is starting up. Please wait a while until it's ready.") } } createServletHandler("/", servlet, basePath) From 5299874488ab633cae68d908d8b3c93dfd0d3eb2 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:23:45 +0800 Subject: [PATCH 06/17] add UT --- .../main/scala/org/apache/spark/TestUtils.scala | 13 +++++++++++++ .../scala/org/apache/spark/ui/UISuite.scala | 17 +++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index dcbb9baa2092..24e5534b83ae 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -34,6 +34,7 @@ import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.io.Source import scala.reflect.{classTag, ClassTag} import scala.sys.process.{Process, ProcessLogger} import scala.util.Try @@ -319,6 +320,18 @@ private[spark] object TestUtils { } } + /** + * Returns the response message from an HTTP(S) URL. + */ + def httpResponseMessage( + url: URL, + method: String = "GET", + headers: Seq[(String, String)] = Nil): String = { + withHttpConnection(url, method, headers = headers) { connection => + Source.fromInputStream(connection.getInputStream, "utf-8").getLines().mkString("\n") + } + } + def withHttpConnection[T]( url: URL, method: String = "GET", diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 1a7c8dad5ce7..0c202e11406e 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -424,6 +424,23 @@ class UISuite extends SparkFunSuite { } } + test("SPARK-36237: Attach and start handler after application started in UI ") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(UI.UI_ENABLED, false) + val sc = new SparkContext(conf) + assert(sc.ui.isEmpty) + val sparkUI = SparkUI.create(Some(sc), sc.statusStore, sc.conf, sc.env.securityManager, + sc.appName, "", sc.startTime) + sparkUI.bind() + assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")) + === "Spark is starting up. Please wait a while until it's ready.") + sparkUI.attachAllHandler() + assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName)) + sc.stop() + } + /** * Create a new context handler for the given path, with a single servlet that responds to * requests in `$path/root`. From 8665253fb6858c695c7d626892efe2bcc53dcec7 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:28:15 +0800 Subject: [PATCH 07/17] follow comment --- .../spark/deploy/history/HistoryServer.scala | 1 - .../scala/org/apache/spark/ui/SparkUI.scala | 36 +++++++++++++++++++ .../scala/org/apache/spark/ui/WebUI.scala | 23 ++---------- .../deploy/history/HistoryServerSuite.scala | 2 -- .../RealBrowserUIHistoryServerSuite.scala | 1 - .../deploy/master/ui/MasterWebUISuite.scala | 1 - 6 files changed, 38 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 478f55c44181..bea3f9ec84a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -308,7 +308,6 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() - server.attachAllHandler() provider.start() ShutdownHookManager.addShutdownHook { () => server.stop() } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b1769a8a9c9e..fedfcc9a5a41 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -19,6 +19,7 @@ package org.apache.spark.ui import java.util.Date +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI._ @@ -30,6 +31,7 @@ import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobsTab, StagesTab} import org.apache.spark.ui.storage.StorageTab +import org.eclipse.jetty.servlet.ServletContextHandler /** * Top level user interface for a Spark application. @@ -54,6 +56,25 @@ private[spark] class SparkUI private ( private var streamingJobProgressListener: Option[SparkListener] = None + protected val initHandler: ServletContextHandler = { + val servlet = new HttpServlet() { + override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + res.setContentType("text/html;charset=utf-8") + res.getWriter.write("Spark is starting up. Please wait a while until it's ready.") + } + } + createServletHandler("/", servlet, basePath) + } + + /** + * Attach all existed handler to ServerInfo. + */ + def attachAllHandler(): Unit = { + serverInfo.foreach { server => + server.removeHandler(initHandler) + handlers.foreach(server.addHandler(_, securityManager)) + } + } /** Initialize all components of the server. */ def initialize(): Unit = { val jobsTab = new JobsTab(this, store) @@ -96,6 +117,21 @@ private[spark] class SparkUI private ( appId = id } + override def bind(): Unit = { + assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") + try { + val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") + val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) + server.addHandler(initHandler, securityManager) + serverInfo = Some(server) + logInfo(s"Bound $className to $host, and started at $webUrl") + } catch { + case e: Exception => + logError(s"Failed to bind $className", e) + System.exit(1) + } + } + /** Stop the server behind this web interface. Only valid after bind(). */ override def stop(): Unit = { super.stop() diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index cf16889842a6..d826686382f1 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui import java.util.EnumSet import javax.servlet.DispatcherType -import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import javax.servlet.http.{HttpServlet, HttpServletRequest} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -51,15 +51,6 @@ private[spark] abstract class WebUI( extends Logging { protected val tabs = ArrayBuffer[WebUITab]() - protected val initHandler: ServletContextHandler = { - val servlet = new HttpServlet() { - override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { - res.setContentType("text/html;charset=utf-8") - res.getWriter.write("Spark is starting up. Please wait a while until it's ready.") - } - } - createServletHandler("/", servlet, basePath) - } protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None @@ -145,16 +136,6 @@ private[spark] abstract class WebUI( attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) } - /** - * Attach all existed handler to ServerInfo. - */ - def attachAllHandler(): Unit = { - serverInfo.foreach { server => - server.removeHandler(initHandler) - handlers.foreach(server.addHandler(_, securityManager)) - } - } - /** A hook to initialize components of the UI */ def initialize(): Unit @@ -164,7 +145,7 @@ private[spark] abstract class WebUI( try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) - server.addHandler(initHandler, securityManager) + handlers.foreach(server.addHandler(_, securityManager)) serverInfo = Some(server) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 8d68191aa12b..734ad7b6e3bd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -93,7 +93,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(conf, provider, securityManager, 18080) server.initialize() server.bind() - server.attachAllHandler() provider.start() port = server.boundPort } @@ -413,7 +412,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() - server.attachAllHandler() provider.start() val port = server.boundPort val metrics = server.cacheMetrics diff --git a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala index 892b272f3719..4a5c34f86753 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala @@ -89,7 +89,6 @@ abstract class RealBrowserUIHistoryServerSuite(val driverProp: String) server = new HistoryServer(conf, provider, securityManager, 18080) server.initialize() server.bind() - server.attachAllHandler() provider.start() port = server.boundPort } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index ad919adf8c19..be83ec12f92f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -50,7 +50,6 @@ class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { super.beforeAll() masterWebUI.bind() - masterWebUI.attachAllHandler() } override def afterAll(): Unit = { From 4ce656ce7ac23d6899a99025ef1f28abbe9a0559 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:34:40 +0800 Subject: [PATCH 08/17] update --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 - core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 - core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 2 +- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4438d9965bf8..c964e343ca6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -143,7 +143,6 @@ private[deploy] class Master( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() - webUi.attachAllHandler() masterWebUiUrl = webUi.webUrl if (reverseProxy) { val uiReverseProxyUrl = conf.get(UI_REVERSE_PROXY_URL).map(_.stripSuffix("/")) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 90bd63499691..05e8e5a6b676 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -234,7 +234,6 @@ private[deploy] class Worker( setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - webUi.attachAllHandler() workerWebUiUrl = s"${webUi.scheme}$publicAddress:${webUi.boundPort}" registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index fedfcc9a5a41..d6e4e57d5f3c 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -121,7 +121,7 @@ private[spark] class SparkUI private ( assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") - val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) + val server = startJettyServer(host, SparkUI.getUIPort(conf), sslOptions, conf, "SparkUI", 200) server.addHandler(initHandler, securityManager) serverInfo = Some(server) logInfo(s"Bound $className to $host, and started at $webUrl") diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index d826686382f1..27da00a46206 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -56,7 +56,7 @@ private[spark] abstract class WebUI( protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) - private val className = Utils.getFormattedClassName(this) + protected val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs.toSeq From d2662fb4caf36bd7090fb8aab4921a602b3804b6 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:39:12 +0800 Subject: [PATCH 09/17] update --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 +--- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 11 ++++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index d6e4e57d5f3c..9ac3c52bbe66 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -120,11 +120,9 @@ private[spark] class SparkUI private ( override def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { - val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") - val server = startJettyServer(host, SparkUI.getUIPort(conf), sslOptions, conf, "SparkUI", 200) + val server = initServer() server.addHandler(initHandler, securityManager) serverInfo = Some(server) - logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 27da00a46206..80723c34eb4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -139,15 +139,20 @@ private[spark] abstract class WebUI( /** A hook to initialize components of the UI */ def initialize(): Unit + def initServer(): ServerInfo = { + val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") + val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) + logInfo(s"Bound $className to $host, and started at $webUrl") + server + } + /** Binds to the HTTP server behind this web interface. */ def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { - val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") - val server = startJettyServer(host, port, sslOptions, conf, name, poolSize) + val server = initServer() handlers.foreach(server.addHandler(_, securityManager)) serverInfo = Some(server) - logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) From 7903412a0a2f2d2d865b35a389e7aa60b9dbd5ae Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:39:44 +0800 Subject: [PATCH 10/17] Update SparkUI.scala --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9ac3c52bbe66..18e63d864e4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -18,8 +18,10 @@ package org.apache.spark.ui import java.util.Date - import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} + +import org.eclipse.jetty.servlet.ServletContextHandler + import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI._ @@ -31,7 +33,7 @@ import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobsTab, StagesTab} import org.apache.spark.ui.storage.StorageTab -import org.eclipse.jetty.servlet.ServletContextHandler + /** * Top level user interface for a Spark application. From b575cab813b2cdc5f7cb102cc1b1e9a2f5184956 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:41:08 +0800 Subject: [PATCH 11/17] update --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 18e63d864e4d..9dfa5d383a36 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -34,7 +34,6 @@ import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobsTab, StagesTab} import org.apache.spark.ui.storage.StorageTab - /** * Top level user interface for a Spark application. */ @@ -58,7 +57,7 @@ private[spark] class SparkUI private ( private var streamingJobProgressListener: Option[SparkListener] = None - protected val initHandler: ServletContextHandler = { + private val initHandler: ServletContextHandler = { val servlet = new HttpServlet() { override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { res.setContentType("text/html;charset=utf-8") diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 80723c34eb4d..1098ee63ba35 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -56,7 +56,7 @@ private[spark] abstract class WebUI( protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) - protected val className = Utils.getFormattedClassName(this) + private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs.toSeq From d5652d74e0f8eec6a9ca48c667b81c076e0bac89 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:42:11 +0800 Subject: [PATCH 12/17] Update WebUI.scala --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 1098ee63ba35..80723c34eb4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -56,7 +56,7 @@ private[spark] abstract class WebUI( protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) - private val className = Utils.getFormattedClassName(this) + protected val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs.toSeq From 677a3390e6541d4e31b78042bd6b0e9e6a48c0c9 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:44:33 +0800 Subject: [PATCH 13/17] Update UISuite.scala --- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 0c202e11406e..ca765c7e92b5 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -438,6 +438,7 @@ class UISuite extends SparkFunSuite { === "Spark is starting up. Please wait a while until it's ready.") sparkUI.attachAllHandler() assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName)) + sparkUI.stop() sc.stop() } From b16531f5c344e3c8ee85002da83de61c08af0eeb Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 00:47:18 +0800 Subject: [PATCH 14/17] Update SparkUI.scala --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9dfa5d383a36..818cda37aa90 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -118,6 +118,10 @@ private[spark] class SparkUI private ( appId = id } + /** + * When start SparUI, Spark start Jetty Server first to bind address then after + * Spark application fully started, call [attachAllHandlers] to start all handler. + */ override def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { From 15aec1a612e40d4b899657760c2dc305f747cadc Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 22:34:35 +0800 Subject: [PATCH 15/17] Update SparkUI.scala --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 818cda37aa90..b0103390be7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -68,7 +68,7 @@ private[spark] class SparkUI private ( } /** - * Attach all existed handler to ServerInfo. + * Attach all existing handlers to ServerInfo. */ def attachAllHandler(): Unit = { serverInfo.foreach { server => @@ -119,8 +119,8 @@ private[spark] class SparkUI private ( } /** - * When start SparUI, Spark start Jetty Server first to bind address then after - * Spark application fully started, call [attachAllHandlers] to start all handler. + * To start SparUI, Spark starts Jetty Server first to bind address. + * After the Spark application is fully started, call [attachAllHandlers] to start all handlers. */ override def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") From c5148551c9d18b2b68a082dfdfdda165e319ece6 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 22:36:16 +0800 Subject: [PATCH 16/17] Update SparkUI.scala --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b0103390be7e..db1f8bc1a2ff 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -120,7 +120,8 @@ private[spark] class SparkUI private ( /** * To start SparUI, Spark starts Jetty Server first to bind address. - * After the Spark application is fully started, call [attachAllHandlers] to start all handlers. + * After the Spark application is fully started, call [attachAllHandlers] + * to start all existing handlers. */ override def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") From 0c9c3ce97871ae1d081c3e3cdc32034b1a1e66fd Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 30 Jul 2021 01:19:35 +0800 Subject: [PATCH 17/17] Update UISuite.scala --- .../scala/org/apache/spark/ui/UISuite.scala | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index ca765c7e92b5..90136dd06237 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -425,21 +425,25 @@ class UISuite extends SparkFunSuite { } test("SPARK-36237: Attach and start handler after application started in UI ") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set(UI.UI_ENABLED, false) - val sc = new SparkContext(conf) - assert(sc.ui.isEmpty) - val sparkUI = SparkUI.create(Some(sc), sc.statusStore, sc.conf, sc.env.securityManager, - sc.appName, "", sc.startTime) - sparkUI.bind() - assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")) - === "Spark is starting up. Please wait a while until it's ready.") - sparkUI.attachAllHandler() - assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName)) - sparkUI.stop() - sc.stop() + def newSparkContextWithoutUI(): SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(UI.UI_ENABLED, false) + new SparkContext(conf) + } + + withSpark(newSparkContextWithoutUI()) { sc => + assert(sc.ui.isEmpty) + val sparkUI = SparkUI.create(Some(sc), sc.statusStore, sc.conf, sc.env.securityManager, + sc.appName, "", sc.startTime) + sparkUI.bind() + assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")) + === "Spark is starting up. Please wait a while until it's ready.") + sparkUI.attachAllHandler() + assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName)) + sparkUI.stop() + } } /**