Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.server.handler.ContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.{JettyUtils, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -60,8 +61,8 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers

val handlers = metricsHandlers ++ Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static/*"),
val handlers = metricsHandlers ++ Seq[ContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/app/json",
createServlet((request: HttpServletRequest) => applicationPage.renderJson(request),
master.securityMgr)),
Expand All @@ -79,5 +80,5 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
}

private[spark] object MasterWebUI {
val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.deploy.worker.ui

import java.io.File
import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.server.handler.ContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.{JettyUtils, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand All @@ -46,8 +47,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I

val metricsHandlers = worker.metricsSystem.getServletHandlers

val handlers = metricsHandlers ++ Seq[ServletContextHandler](
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
val handlers = metricsHandlers ++ Seq[ContextHandler](
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"),
createServletHandler("/log", createServlet((request: HttpServletRequest) => log(request),
worker.securityMgr)),
createServletHandler("/logPage", createServlet((request: HttpServletRequest) => logPage
Expand Down Expand Up @@ -202,6 +203,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
}

private[spark] object WorkerWebUI {
val STATIC_RESOURCE_BASE = "org/apache/spark/ui"
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
val DEFAULT_PORT="8081"
}
47 changes: 23 additions & 24 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

import org.eclipse.jetty.server.{NetworkConnector, Server}
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.server.handler.{ContextHandler, ContextHandlerCollection, ResourceHandler}
import org.eclipse.jetty.servlet.{FilterHolder, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.QueuedThreadPool

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand All @@ -44,7 +44,8 @@ private[spark] object JettyUtils extends Logging {

type Responder[T] = HttpServletRequest => T

class ServletParams[T <% AnyRef](val responder: Responder[T],
class ServletParams[T <% AnyRef](
val responder: Responder[T],
val contentType: String,
val extractFn: T => String = (in: Any) => in.toString) {}

Expand All @@ -58,7 +59,8 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
new ServletParams(responder, "text/plain")

def createServlet[T <% AnyRef](servletParams: ServletParams[T],
def createServlet[T <% AnyRef](
servletParams: ServletParams[T],
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
Expand Down Expand Up @@ -89,32 +91,27 @@ private[spark] object JettyUtils extends Logging {
/** Creates a handler that always redirects the user to a given path */
def createRedirectHandler(newPath: String, path: String): ServletContextHandler = {
val servlet = new HttpServlet {
override def doGet(request: HttpServletRequest,
response: HttpServletResponse) {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
// make sure we don't end up with // in the middle
val newUri = new URL(new URL(request.getRequestURL.toString), newPath).toURI
response.sendRedirect(newUri.toString)
}
}
val contextHandler = new ServletContextHandler()
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(path)
contextHandler.addServlet(holder, "/")
contextHandler
createServletHandler(path, servlet)
}

/** Creates a handler for serving files from a static directory */
def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
val contextHandler = new ServletContextHandler()
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
def createStaticHandler(resourceBase: String, path: String): ContextHandler = {
val resourceHandler = new ResourceHandler
Option(getClass.getClassLoader.getResource(resourceBase)) match {
case Some(res) =>
holder.setInitParameter("resourceBase", res.toString)
resourceHandler.setResourceBase(res.toString)
case None =>
throw new Exception("Could not find resource path for Web UI: " + resourceBase)
}
contextHandler.addServlet(holder, path)
val contextHandler = new ContextHandler
contextHandler.setContextPath(path)
contextHandler.setHandler(resourceHandler)
contextHandler
}

Expand All @@ -133,7 +130,7 @@ private[spark] object JettyUtils extends Logging {
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
}
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
Expand All @@ -152,12 +149,15 @@ private[spark] object JettyUtils extends Logging {
def startJettyServer(
hostName: String,
port: Int,
handlers: Seq[ServletContextHandler],
handlers: Seq[ContextHandler],
conf: SparkConf): (Server, Int) = {

addFilters(handlers, conf)
val handlerList = new HandlerList
handlerList.setHandlers(handlers.toArray)
// Add security filters
val servletContextHandlers = handlers.collect { case h: ServletContextHandler => h }
addFilters(servletContextHandlers, conf)

val handlerCollection = new ContextHandlerCollection
handlerCollection.setHandlers(handlers.toArray)

@tailrec
def connect(currentPort: Int): (Server, Int) = {
Expand All @@ -166,8 +166,7 @@ private[spark] object JettyUtils extends Logging {
// constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
val pool = server.getThreadPool.asInstanceOf[QueuedThreadPool]
pool.setDaemon(true)

server.setHandler(handlerList)
server.setHandler(handlerCollection)

Try {
server.start()
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.server.handler.ContextHandler

import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.ui.JettyUtils._
Expand All @@ -37,8 +35,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
var boundPort: Option[Int] = None
var server: Option[Server] = None

val handlers = Seq[ServletContextHandler] (
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static/*"),
val handlers = Seq[ContextHandler] (
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
createRedirectHandler("/stages", "/")
)
val storage = new BlockManagerUI(sc)
Expand Down Expand Up @@ -86,5 +84,5 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {

private[spark] object SparkUI {
val DEFAULT_PORT = "4040"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}