Skip to content

Commit 2d77332

Browse files
committed
Merge remote-tracking branch 'upstream/master' into codeblock-api
2 parents 8d0b1b9 + 00c13cf commit 2d77332

File tree

98 files changed

+2486
-676
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+2486
-676
lines changed

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,6 @@ private[spark] class TaskContextImpl(
178178

179179
private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException
180180

181+
// TODO: shall we publish it and define it in `TaskContext`?
182+
private[spark] def getLocalProperties(): Properties = localProperties
181183
}

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason {
212212
* Task was killed intentionally and needs to be rescheduled.
213213
*/
214214
@DeveloperApi
215-
case class TaskKilled(reason: String) extends TaskFailedReason {
215+
case class TaskKilled(
216+
reason: String,
217+
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
218+
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
219+
extends TaskFailedReason {
220+
216221
override def toErrorString: String = s"TaskKilled ($reason)"
217222
override def countTowardsTaskFailures: Boolean = false
223+
218224
}
219225

220226
/**

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
3737
val lastUpdatedTime = parent.getLastUpdatedTime()
3838
val providerConfig = parent.getProviderConfig()
3939
val content =
40-
<script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script> ++
41-
<script src={UIUtils.prependBaseUri("/static/utils.js")}></script>
40+
<script src={UIUtils.prependBaseUri(request, "/static/historypage-common.js")}></script> ++
41+
<script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script>
4242
<div>
4343
<div class="container-fluid">
4444
<ul class="unstyled">
@@ -64,9 +64,10 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
6464

6565
{
6666
if (allAppsSize > 0) {
67-
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
67+
<script src={UIUtils.prependBaseUri(
68+
request, "/static/dataTables.rowsGroup.js")}></script> ++
6869
<div id="history-summary" class="row-fluid"></div> ++
69-
<script src={UIUtils.prependBaseUri("/static/historypage.js")}></script> ++
70+
<script src={UIUtils.prependBaseUri(request, "/static/historypage.js")}></script> ++
7071
<script>setAppLimit({parent.maxApplications})</script>
7172
} else if (requestedIncomplete) {
7273
<h4>No incomplete applications found!</h4>
@@ -77,7 +78,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
7778
}
7879
}
7980

80-
<a href={makePageLink(!requestedIncomplete)}>
81+
<a href={makePageLink(request, !requestedIncomplete)}>
8182
{
8283
if (requestedIncomplete) {
8384
"Back to completed applications"
@@ -88,11 +89,11 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
8889
</a>
8990
</div>
9091
</div>
91-
UIUtils.basicSparkPage(content, "History Server", true)
92+
UIUtils.basicSparkPage(request, content, "History Server", true)
9293
}
9394

94-
private def makePageLink(showIncomplete: Boolean): String = {
95-
UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete)
95+
private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = {
96+
UIUtils.prependBaseUri(request, "/?" + "showIncomplete=" + showIncomplete)
9697
}
9798

9899
private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class HistoryServer(
8787
if (!loadAppUi(appId, None) && (!attemptId.isDefined || !loadAppUi(appId, attemptId))) {
8888
val msg = <div class="row-fluid">Application {appId} not found.</div>
8989
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
90-
UIUtils.basicSparkPage(msg, "Not Found").foreach { n =>
90+
UIUtils.basicSparkPage(req, msg, "Not Found").foreach { n =>
9191
res.getWriter().write(n.toString)
9292
}
9393
return

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
4040
.getOrElse(state.completedApps.find(_.id == appId).orNull)
4141
if (app == null) {
4242
val msg = <div class="row-fluid">No running application with ID {appId}</div>
43-
return UIUtils.basicSparkPage(msg, "Not Found")
43+
return UIUtils.basicSparkPage(request, msg, "Not Found")
4444
}
4545

4646
val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
@@ -127,7 +127,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
127127
}
128128
</div>
129129
</div>;
130-
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
130+
UIUtils.basicSparkPage(request, content, "Application: " + app.desc.name)
131131
}
132132

133133
private def executorRow(executor: ExecutorDesc): Seq[Node] = {

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
215215
}
216216
</div>;
217217

218-
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
218+
UIUtils.basicSparkPage(request, content, "Spark Master at " + state.uri)
219219
}
220220

221221
private def workerRow(worker: WorkerInfo): Seq[Node] = {

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
118118
<script>{Unparsed(jsOnload)}</script>
119119
</div>
120120

121-
UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
121+
UIUtils.basicSparkPage(request, content, logType + " log page for " + pageName)
122122
}
123123

124124
/** Get the part of the log files given the offset and desired length of bytes */

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
135135
}
136136
</div>
137137
</div>;
138-
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
138+
UIUtils.basicSparkPage(request, content, "Spark Worker at %s:%s".format(
139139
workerState.host, workerState.port))
140140
}
141141

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,28 @@ private[spark] class Executor(
287287
notifyAll()
288288
}
289289

290+
/**
291+
* Utility function to:
292+
* 1. Report executor runtime and JVM gc time if possible
293+
* 2. Collect accumulator updates
294+
* 3. Set the finished flag to true and clear current thread's interrupt status
295+
*/
296+
private def collectAccumulatorsAndResetStatusOnFailure(taskStartTime: Long) = {
297+
// Report executor runtime and JVM gc time
298+
Option(task).foreach(t => {
299+
t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStartTime)
300+
t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
301+
})
302+
303+
// Collect latest accumulator values to report back to the driver
304+
val accums: Seq[AccumulatorV2[_, _]] =
305+
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
306+
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
307+
308+
setTaskFinishedAndClearInterruptStatus()
309+
(accums, accUpdates)
310+
}
311+
290312
override def run(): Unit = {
291313
threadId = Thread.currentThread.getId
292314
Thread.currentThread.setName(threadName)
@@ -300,7 +322,7 @@ private[spark] class Executor(
300322
val ser = env.closureSerializer.newInstance()
301323
logInfo(s"Running $taskName (TID $taskId)")
302324
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
303-
var taskStart: Long = 0
325+
var taskStartTime: Long = 0
304326
var taskStartCpu: Long = 0
305327
startGCTime = computeTotalGcTime()
306328

@@ -336,7 +358,7 @@ private[spark] class Executor(
336358
}
337359

338360
// Run the actual task and measure its runtime.
339-
taskStart = System.currentTimeMillis()
361+
taskStartTime = System.currentTimeMillis()
340362
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
341363
threadMXBean.getCurrentThreadCpuTime
342364
} else 0L
@@ -396,11 +418,11 @@ private[spark] class Executor(
396418
// Deserialization happens in two parts: first, we deserialize a Task object, which
397419
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
398420
task.metrics.setExecutorDeserializeTime(
399-
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
421+
(taskStartTime - deserializeStartTime) + task.executorDeserializeTime)
400422
task.metrics.setExecutorDeserializeCpuTime(
401423
(taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
402424
// We need to subtract Task.run()'s deserialization time to avoid double-counting
403-
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
425+
task.metrics.setExecutorRunTime((taskFinish - taskStartTime) - task.executorDeserializeTime)
404426
task.metrics.setExecutorCpuTime(
405427
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
406428
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
@@ -482,16 +504,19 @@ private[spark] class Executor(
482504
} catch {
483505
case t: TaskKilledException =>
484506
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")
485-
setTaskFinishedAndClearInterruptStatus()
486-
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
507+
508+
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
509+
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums))
510+
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
487511

488512
case _: InterruptedException | NonFatal(_) if
489513
task != null && task.reasonIfKilled.isDefined =>
490514
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
491515
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
492-
setTaskFinishedAndClearInterruptStatus()
493-
execBackend.statusUpdate(
494-
taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))
516+
517+
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
518+
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
519+
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
495520

496521
case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
497522
val reason = task.context.fetchFailed.get.toTaskFailedReason
@@ -524,17 +549,7 @@ private[spark] class Executor(
524549
// the task failure would not be ignored if the shutdown happened because of premption,
525550
// instead of an app issue).
526551
if (!ShutdownHookManager.inShutdown()) {
527-
// Collect latest accumulator values to report back to the driver
528-
val accums: Seq[AccumulatorV2[_, _]] =
529-
if (task != null) {
530-
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
531-
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
532-
task.collectAccumulatorUpdates(taskFailed = true)
533-
} else {
534-
Seq.empty
535-
}
536-
537-
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
552+
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
538553

539554
val serializedTaskEndReason = {
540555
try {

core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ import org.apache.spark.util.Utils
3434
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
3535
* called when no more events need to be delivered.
3636
*/
37-
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
37+
private class AsyncEventQueue(
38+
val name: String,
39+
conf: SparkConf,
40+
metrics: LiveListenerBusMetrics,
41+
bus: LiveListenerBus)
3842
extends SparkListenerBus
3943
with Logging {
4044

@@ -81,23 +85,18 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
8185
}
8286

8387
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
84-
try {
85-
var next: SparkListenerEvent = eventQueue.take()
86-
while (next != POISON_PILL) {
87-
val ctx = processingTime.time()
88-
try {
89-
super.postToAll(next)
90-
} finally {
91-
ctx.stop()
92-
}
93-
eventCount.decrementAndGet()
94-
next = eventQueue.take()
88+
var next: SparkListenerEvent = eventQueue.take()
89+
while (next != POISON_PILL) {
90+
val ctx = processingTime.time()
91+
try {
92+
super.postToAll(next)
93+
} finally {
94+
ctx.stop()
9595
}
9696
eventCount.decrementAndGet()
97-
} catch {
98-
case ie: InterruptedException =>
99-
logInfo(s"Stopping listener queue $name.", ie)
97+
next = eventQueue.take()
10098
}
99+
eventCount.decrementAndGet()
101100
}
102101

103102
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
@@ -130,7 +129,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
130129
eventCount.incrementAndGet()
131130
eventQueue.put(POISON_PILL)
132131
}
133-
dispatchThread.join()
132+
// this thread might be trying to stop itself as part of error handling -- we can't join
133+
// in that case.
134+
if (Thread.currentThread() != dispatchThread) {
135+
dispatchThread.join()
136+
}
134137
}
135138

136139
def post(event: SparkListenerEvent): Unit = {
@@ -187,6 +190,12 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
187190
true
188191
}
189192

193+
override def removeListenerOnError(listener: SparkListenerInterface): Unit = {
194+
// the listener failed in an unrecoverably way, we want to remove it from the entire
195+
// LiveListenerBus (potentially stopping a queue if it is empty)
196+
bus.removeListener(listener)
197+
}
198+
190199
}
191200

192201
private object AsyncEventQueue {

0 commit comments

Comments
 (0)