Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,6 @@ class SparkContext(config: SparkConf) extends Logging {
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start(_conf.get(METRICS_STATIC_SOURCES_ENABLED))
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

_eventLogger =
if (isEventLogEnabled) {
Expand Down Expand Up @@ -639,6 +637,11 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
postApplicationStart()

// After application started, attach handlers to started server and start handler.
_ui.foreach(_.attachAllHandler())
Copy link
Member

@gengliangwang gengliangwang Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, why do we need to attach all the handlers again? And there are so many "attachAllHandler" in the code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, why do we need to attach all the handlers again? And there are so many "attachAllHandler" in the code changes.

That me clarify the process.
Before this change. We add handlers to SparkUI but not start it when init SparkUI since we not start jetty server.
Then it call bind(), in this method we start jetty server and attach all handler to server.
this cause a problem that when we call bind(), we expose all servlet API to user. But application not fully started yet.
Now in this pr, I split the behavior of start jetty server and attach handlers to server.
We need to bind address(start jetty server) first before start AM since we need driver url address to bind with spark proxy server.

Then after application fully started, we attach all handlers to server(means expose UI url to user).

For this comment #33457 (comment)
I add a initHandler to handle all request between starting jetty server and fully start application to show the hint message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the behavior of start jetty server and attach handlers to server

I add some logging and find that attachHandler is still called multiple times before attachAllHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the behavior of start jetty server and attach handlers to server

I add some logging and find that attachHandler is still called multiple times before attachAllHandler

Yes, but when we call attachHandler() the serverInfo is none, so the handler is not attached to the server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid calling attachAllHandler in so many places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid calling attachAllHandler in so many places?

Add a new method call it bindAndAttachAllHandler() then replace other place to call this one? only call bind() and attachAllHandler in SparkContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that work for history server and master/worker UI? If yes let's try to avoid changing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that work for history server and master/worker UI? If yes let's try to avoid changing them.

How about current?

Copy link
Member

@gengliangwang gengliangwang Jul 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, will history server and master/worker UI show the same page(spark is starting up...) on starting up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, will history server and master/worker UI show the same page(spark is starting up...) on starting up?

With current code, won't impact history server/master/worker web UI. and history server/master/worker web UI doesn't have such problem.

// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

// Post init
_taskScheduler.postStartHook()
if (isLocal) {
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.{classTag, ClassTag}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try
Expand Down Expand Up @@ -319,6 +320,18 @@ private[spark] object TestUtils {
}
}

/**
* Returns the response message from an HTTP(S) URL.
*/
def httpResponseMessage(
url: URL,
method: String = "GET",
headers: Seq[(String, String)] = Nil): String = {
withHttpConnection(url, method, headers = headers) { connection =>
Source.fromInputStream(connection.getInputStream, "utf-8").getLines().mkString("\n")
}
}

def withHttpConnection[T](
url: URL,
method: String = "GET",
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.ui

import java.util.Date
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -54,6 +57,25 @@ private[spark] class SparkUI private (

private var streamingJobProgressListener: Option[SparkListener] = None

private val initHandler: ServletContextHandler = {
val servlet = new HttpServlet() {
override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
res.setContentType("text/html;charset=utf-8")
res.getWriter.write("Spark is starting up. Please wait a while until it's ready.")
}
}
createServletHandler("/", servlet, basePath)
}

/**
* Attach all existing handlers to ServerInfo.
*/
def attachAllHandler(): Unit = {
serverInfo.foreach { server =>
server.removeHandler(initHandler)
handlers.foreach(server.addHandler(_, securityManager))
}
}
/** Initialize all components of the server. */
def initialize(): Unit = {
val jobsTab = new JobsTab(this, store)
Expand Down Expand Up @@ -96,6 +118,24 @@ private[spark] class SparkUI private (
appId = id
}

/**
* To start SparUI, Spark starts Jetty Server first to bind address.
* After the Spark application is fully started, call [attachAllHandlers]
* to start all existing handlers.
*/
override def bind(): Unit = {
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
val server = initServer()
server.addHandler(initHandler, securityManager)
serverInfo = Some(server)
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
System.exit(1)
}
}

/** Stop the server behind this web interface. Only valid after bind(). */
override def stop(): Unit = {
super.stop()
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] abstract class WebUI(
protected var serverInfo: Option[ServerInfo] = None
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
conf.get(DRIVER_HOST_ADDRESS))
private val className = Utils.getFormattedClassName(this)
protected val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
def getTabs: Seq[WebUITab] = tabs.toSeq
Expand Down Expand Up @@ -139,15 +139,20 @@ private[spark] abstract class WebUI(
/** A hook to initialize components of the UI */
def initialize(): Unit

def initServer(): ServerInfo = {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
val server = startJettyServer(host, port, sslOptions, conf, name, poolSize)
logInfo(s"Bound $className to $host, and started at $webUrl")
server
}

/** Binds to the HTTP server behind this web interface. */
def bind(): Unit = {
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
val server = startJettyServer(host, port, sslOptions, conf, name, poolSize)
val server = initServer()
handlers.foreach(server.addHandler(_, securityManager))
serverInfo = Some(server)
logInfo(s"Bound $className to $host, and started at $webUrl")
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,28 @@ class UISuite extends SparkFunSuite {
}
}

test("SPARK-36237: Attach and start handler after application started in UI ") {
def newSparkContextWithoutUI(): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set(UI.UI_ENABLED, false)
new SparkContext(conf)
}

withSpark(newSparkContextWithoutUI()) { sc =>
assert(sc.ui.isEmpty)
val sparkUI = SparkUI.create(Some(sc), sc.statusStore, sc.conf, sc.env.securityManager,
sc.appName, "", sc.startTime)
sparkUI.bind()
assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs"))
=== "Spark is starting up. Please wait a while until it's ready.")
sparkUI.attachAllHandler()
assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName))
sparkUI.stop()
}
}

/**
* Create a new context handler for the given path, with a single servlet that responds to
* requests in `$path/root`.
Expand Down