Skip to content

Commit cf2e9da

Browse files
BryanCutlerAndrew Or
authored andcommitted
[SPARK-12299][CORE] Remove history serving functionality from Master
Remove history server functionality from standalone Master. Previously, the Master process rebuilt a SparkUI once the application was completed which sometimes caused problems, such as OOM, when the application event log is large (see SPARK-6270). Keeping this functionality out of the Master will help to simplify the process and increase stability. Testing for this change included running core unit tests and manually running an application on a standalone cluster to verify that it completed successfully and that the Master UI functioned correctly. Also added 2 unit tests to verify killing an application and driver from MasterWebUI makes the correct request to the Master. Author: Bryan Cutler <[email protected]> Closes #10991 from BryanCutler/remove-history-master-SPARK-12299.
1 parent 0c00391 commit cf2e9da

File tree

11 files changed

+86
-316
lines changed

11 files changed

+86
-316
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ private[spark] class ApplicationInfo(
4141
@transient var coresGranted: Int = _
4242
@transient var endTime: Long = _
4343
@transient var appSource: ApplicationSource = _
44-
@transient @volatile var appUIUrlAtHistoryServer: Option[String] = None
4544

4645
// A cap on the number of executors this application can have at any given time.
4746
// By default, this is infinite. Only after the first allocation request is issued by the
@@ -66,7 +65,6 @@ private[spark] class ApplicationInfo(
6665
nextExecutorId = 0
6766
removedExecutors = new ArrayBuffer[ExecutorDesc]
6867
executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
69-
appUIUrlAtHistoryServer = None
7068
}
7169

7270
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -136,11 +134,4 @@ private[spark] class ApplicationInfo(
136134
System.currentTimeMillis() - startTime
137135
}
138136
}
139-
140-
/**
141-
* Returns the original application UI url unless there is its address at history server
142-
* is defined
143-
*/
144-
def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)
145-
146137
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,25 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import java.io.FileNotFoundException
21-
import java.net.URLEncoder
2220
import java.text.SimpleDateFormat
2321
import java.util.Date
24-
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
22+
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2523

2624
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
27-
import scala.concurrent.{ExecutionContext, Future}
28-
import scala.concurrent.duration.Duration
29-
import scala.language.postfixOps
3025
import scala.util.Random
3126

32-
import org.apache.hadoop.fs.Path
33-
3427
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3528
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
3629
ExecutorState, SparkHadoopUtil}
3730
import org.apache.spark.deploy.DeployMessages._
38-
import org.apache.spark.deploy.history.HistoryServer
3931
import org.apache.spark.deploy.master.DriverState.DriverState
4032
import org.apache.spark.deploy.master.MasterMessages._
4133
import org.apache.spark.deploy.master.ui.MasterWebUI
4234
import org.apache.spark.deploy.rest.StandaloneRestServer
4335
import org.apache.spark.internal.Logging
4436
import org.apache.spark.metrics.MetricsSystem
4537
import org.apache.spark.rpc._
46-
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
4738
import org.apache.spark.serializer.{JavaSerializer, Serializer}
48-
import org.apache.spark.ui.SparkUI
4939
import org.apache.spark.util.{ThreadUtils, Utils}
5040

5141
private[deploy] class Master(
@@ -59,10 +49,6 @@ private[deploy] class Master(
5949
private val forwardMessageThread =
6050
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
6151

62-
private val rebuildUIThread =
63-
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
64-
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
65-
6652
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
6753

6854
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -85,8 +71,6 @@ private[deploy] class Master(
8571
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
8672
private val completedApps = new ArrayBuffer[ApplicationInfo]
8773
private var nextAppNumber = 0
88-
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
89-
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
9074

9175
private val drivers = new HashSet[DriverInfo]
9276
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -199,7 +183,6 @@ private[deploy] class Master(
199183
checkForWorkerTimeOutTask.cancel(true)
200184
}
201185
forwardMessageThread.shutdownNow()
202-
rebuildUIThread.shutdownNow()
203186
webUi.stop()
204187
restServer.foreach(_.stop())
205188
masterMetricsSystem.stop()
@@ -391,9 +374,6 @@ private[deploy] class Master(
391374
case CheckForWorkerTimeOut =>
392375
timeOutDeadWorkers()
393376

394-
case AttachCompletedRebuildUI(appId) =>
395-
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
396-
Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
397377
}
398378

399379
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -844,17 +824,13 @@ private[deploy] class Master(
844824
if (completedApps.size >= RETAINED_APPLICATIONS) {
845825
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
846826
completedApps.take(toRemove).foreach { a =>
847-
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
848827
applicationMetricsSystem.removeSource(a.appSource)
849828
}
850829
completedApps.trimStart(toRemove)
851830
}
852831
completedApps += app // Remember it in our history
853832
waitingApps -= app
854833

855-
// If application events are logged, use them to rebuild the UI
856-
asyncRebuildSparkUI(app)
857-
858834
for (exec <- app.executors.values) {
859835
killExecutor(exec)
860836
}
@@ -953,89 +929,6 @@ private[deploy] class Master(
953929
exec.state = ExecutorState.KILLED
954930
}
955931

956-
/**
957-
* Rebuild a new SparkUI from the given application's event logs.
958-
* Return the UI if successful, else None
959-
*/
960-
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
961-
val futureUI = asyncRebuildSparkUI(app)
962-
ThreadUtils.awaitResult(futureUI, Duration.Inf)
963-
}
964-
965-
/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
966-
private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
967-
val appName = app.desc.name
968-
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
969-
val eventLogDir = app.desc.eventLogDir
970-
.getOrElse {
971-
// Event logging is disabled for this application
972-
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
973-
return Future.successful(None)
974-
}
975-
val futureUI = Future {
976-
val eventLogFilePrefix = EventLoggingListener.getLogPath(
977-
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
978-
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
979-
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
980-
EventLoggingListener.IN_PROGRESS))
981-
982-
val eventLogFile = if (inProgressExists) {
983-
// Event logging is enabled for this application, but the application is still in progress
984-
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
985-
eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
986-
} else {
987-
eventLogFilePrefix
988-
}
989-
990-
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
991-
val replayBus = new ReplayListenerBus()
992-
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
993-
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
994-
try {
995-
replayBus.replay(logInput, eventLogFile, inProgressExists)
996-
} finally {
997-
logInput.close()
998-
}
999-
1000-
Some(ui)
1001-
}(rebuildUIContext)
1002-
1003-
futureUI.onSuccess { case Some(ui) =>
1004-
appIdToUI.put(app.id, ui)
1005-
// `self` can be null if we are already in the process of shutting down
1006-
// This happens frequently in tests where `local-cluster` is used
1007-
if (self != null) {
1008-
self.send(AttachCompletedRebuildUI(app.id))
1009-
}
1010-
// Application UI is successfully rebuilt, so link the Master UI to it
1011-
// NOTE - app.appUIUrlAtHistoryServer is volatile
1012-
app.appUIUrlAtHistoryServer = Some(ui.basePath)
1013-
}(ThreadUtils.sameThread)
1014-
1015-
futureUI.onFailure {
1016-
case fnf: FileNotFoundException =>
1017-
// Event logging is enabled for this application, but no event logs are found
1018-
val title = s"Application history not found (${app.id})"
1019-
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
1020-
logWarning(msg)
1021-
msg += " Did you specify the correct logging directory?"
1022-
msg = URLEncoder.encode(msg, "UTF-8")
1023-
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
1024-
1025-
case e: Exception =>
1026-
// Relay exception message to application UI page
1027-
val title = s"Application history load error (${app.id})"
1028-
val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
1029-
var msg = s"Exception in replaying log for application $appName!"
1030-
logError(msg, e)
1031-
msg = URLEncoder.encode(msg, "UTF-8")
1032-
app.appUIUrlAtHistoryServer =
1033-
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
1034-
}(ThreadUtils.sameThread)
1035-
1036-
futureUI
1037-
}
1038-
1039932
/** Generate a new app ID given a app's submission date */
1040933
private def newApplicationId(submitDate: Date): String = {
1041934
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)

core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,4 @@ private[master] object MasterMessages {
3939
case object BoundPortsRequest
4040

4141
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
42-
43-
case class AttachCompletedRebuildUI(appId: String)
4442
}

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
7575
</li>
7676
<li><strong>Submit Date:</strong> {app.submitDate}</li>
7777
<li><strong>State:</strong> {app.state}</li>
78-
<li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
78+
{
79+
if (!app.isFinished) {
80+
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
81+
}
82+
}
7983
</ul>
8084
</div>
8185
</div>

core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala

Lines changed: 0 additions & 73 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
206206
{killLink}
207207
</td>
208208
<td>
209-
<a href={app.curAppUIUrl}>{app.desc.name}</a>
209+
{
210+
if (app.isFinished) {
211+
app.desc.name
212+
} else {
213+
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
214+
}
215+
}
210216
</td>
211217
<td>
212218
{app.coresGranted}

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui
1919

2020
import org.apache.spark.deploy.master.Master
2121
import org.apache.spark.internal.Logging
22-
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
23-
UIRoot}
2422
import org.apache.spark.ui.{SparkUI, WebUI}
2523
import org.apache.spark.ui.JettyUtils._
2624

@@ -30,60 +28,26 @@ import org.apache.spark.ui.JettyUtils._
3028
private[master]
3129
class MasterWebUI(
3230
val master: Master,
33-
requestedPort: Int,
34-
customMasterPage: Option[MasterPage] = None)
31+
requestedPort: Int)
3532
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
36-
requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
33+
requestedPort, master.conf, name = "MasterUI") with Logging {
3734

3835
val masterEndpointRef = master.self
3936
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
4037

41-
val masterPage = customMasterPage.getOrElse(new MasterPage(this))
42-
4338
initialize()
4439

4540
/** Initialize all components of the server. */
4641
def initialize() {
4742
val masterPage = new MasterPage(this)
4843
attachPage(new ApplicationPage(this))
49-
attachPage(new HistoryNotFoundPage(this))
5044
attachPage(masterPage)
5145
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
52-
attachHandler(ApiRootResource.getServletHandler(this))
5346
attachHandler(createRedirectHandler(
5447
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
5548
attachHandler(createRedirectHandler(
5649
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
5750
}
58-
59-
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
60-
def attachSparkUI(ui: SparkUI) {
61-
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
62-
ui.getHandlers.foreach(attachHandler)
63-
}
64-
65-
/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
66-
def detachSparkUI(ui: SparkUI) {
67-
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
68-
ui.getHandlers.foreach(detachHandler)
69-
}
70-
71-
def getApplicationInfoList: Iterator[ApplicationInfo] = {
72-
val state = masterPage.getMasterState
73-
val activeApps = state.activeApps.sortBy(_.startTime).reverse
74-
val completedApps = state.completedApps.sortBy(_.endTime).reverse
75-
activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
76-
completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
77-
}
78-
79-
def getSparkUI(appId: String): Option[SparkUI] = {
80-
val state = masterPage.getMasterState
81-
val activeApps = state.activeApps.sortBy(_.startTime).reverse
82-
val completedApps = state.completedApps.sortBy(_.endTime).reverse
83-
(activeApps ++ completedApps).find { _.id == appId }.flatMap {
84-
master.rebuildSparkUI
85-
}
86-
}
8751
}
8852

8953
private[master] object MasterWebUI {

0 commit comments

Comments
 (0)