Skip to content

Commit 0e934e9

Browse files
José Hiram Soltrencmonkey
authored andcommitted
[SPARK-16654][CORE] Add UI coverage for Application Level Blacklisting
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh ![blacklist-20161219](https://cloud.githubusercontent.com/assets/1208477/21335321/9eda320a-c623-11e6-8b8c-9c912a73c276.jpg) Author: José Hiram Soltren <[email protected]> Closes apache#16346 from jsoltren/SPARK-16654-submit.
1 parent b123fda commit 0e934e9

36 files changed

+950
-235
lines changed

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,26 @@ public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved
113113
onEvent(executorRemoved);
114114
}
115115

116+
@Override
117+
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
118+
onEvent(executorBlacklisted);
119+
}
120+
121+
@Override
122+
public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
123+
onEvent(executorUnblacklisted);
124+
}
125+
126+
@Override
127+
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
128+
onEvent(nodeBlacklisted);
129+
}
130+
131+
@Override
132+
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
133+
onEvent(nodeUnblacklisted);
134+
}
135+
116136
@Override
117137
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
118138
onEvent(blockUpdated);

core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
4545
title="Bytes and records written to disk in order to be read by a shuffle in a future stage.">
4646
Shuffle Write</span>
4747
</th>
48+
<th>
49+
<span data-toggle="tooltip" data-placement="left"
50+
title="Number of executors blacklisted by the scheduler due to task failures.">
51+
Blacklisted</span>
52+
</th>
4853
</thead>
4954
<tbody>
5055
</tbody>

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ $(document).ready(function () {
182182
executorsSummary = $("#active-executors");
183183

184184
getStandAloneppId(function (appId) {
185-
185+
186186
var endPoint = createRESTEndPoint(appId);
187187
$.getJSON(endPoint, function (response, status, jqXHR) {
188188
var summary = [];
@@ -202,7 +202,8 @@ $(document).ready(function () {
202202
var allTotalInputBytes = 0;
203203
var allTotalShuffleRead = 0;
204204
var allTotalShuffleWrite = 0;
205-
205+
var allTotalBlacklisted = 0;
206+
206207
var activeExecCnt = 0;
207208
var activeRDDBlocks = 0;
208209
var activeMemoryUsed = 0;
@@ -219,7 +220,8 @@ $(document).ready(function () {
219220
var activeTotalInputBytes = 0;
220221
var activeTotalShuffleRead = 0;
221222
var activeTotalShuffleWrite = 0;
222-
223+
var activeTotalBlacklisted = 0;
224+
223225
var deadExecCnt = 0;
224226
var deadRDDBlocks = 0;
225227
var deadMemoryUsed = 0;
@@ -236,7 +238,8 @@ $(document).ready(function () {
236238
var deadTotalInputBytes = 0;
237239
var deadTotalShuffleRead = 0;
238240
var deadTotalShuffleWrite = 0;
239-
241+
var deadTotalBlacklisted = 0;
242+
240243
response.forEach(function (exec) {
241244
allExecCnt += 1;
242245
allRDDBlocks += exec.rddBlocks;
@@ -254,6 +257,7 @@ $(document).ready(function () {
254257
allTotalInputBytes += exec.totalInputBytes;
255258
allTotalShuffleRead += exec.totalShuffleRead;
256259
allTotalShuffleWrite += exec.totalShuffleWrite;
260+
allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
257261
if (exec.isActive) {
258262
activeExecCnt += 1;
259263
activeRDDBlocks += exec.rddBlocks;
@@ -271,6 +275,7 @@ $(document).ready(function () {
271275
activeTotalInputBytes += exec.totalInputBytes;
272276
activeTotalShuffleRead += exec.totalShuffleRead;
273277
activeTotalShuffleWrite += exec.totalShuffleWrite;
278+
activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
274279
} else {
275280
deadExecCnt += 1;
276281
deadRDDBlocks += exec.rddBlocks;
@@ -288,9 +293,10 @@ $(document).ready(function () {
288293
deadTotalInputBytes += exec.totalInputBytes;
289294
deadTotalShuffleRead += exec.totalShuffleRead;
290295
deadTotalShuffleWrite += exec.totalShuffleWrite;
296+
deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
291297
}
292298
});
293-
299+
294300
var totalSummary = {
295301
"execCnt": ( "Total(" + allExecCnt + ")"),
296302
"allRDDBlocks": allRDDBlocks,
@@ -307,7 +313,8 @@ $(document).ready(function () {
307313
"allTotalGCTime": allTotalGCTime,
308314
"allTotalInputBytes": allTotalInputBytes,
309315
"allTotalShuffleRead": allTotalShuffleRead,
310-
"allTotalShuffleWrite": allTotalShuffleWrite
316+
"allTotalShuffleWrite": allTotalShuffleWrite,
317+
"allTotalBlacklisted": allTotalBlacklisted
311318
};
312319
var activeSummary = {
313320
"execCnt": ( "Active(" + activeExecCnt + ")"),
@@ -325,7 +332,8 @@ $(document).ready(function () {
325332
"allTotalGCTime": activeTotalGCTime,
326333
"allTotalInputBytes": activeTotalInputBytes,
327334
"allTotalShuffleRead": activeTotalShuffleRead,
328-
"allTotalShuffleWrite": activeTotalShuffleWrite
335+
"allTotalShuffleWrite": activeTotalShuffleWrite,
336+
"allTotalBlacklisted": activeTotalBlacklisted
329337
};
330338
var deadSummary = {
331339
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
@@ -343,12 +351,13 @@ $(document).ready(function () {
343351
"allTotalGCTime": deadTotalGCTime,
344352
"allTotalInputBytes": deadTotalInputBytes,
345353
"allTotalShuffleRead": deadTotalShuffleRead,
346-
"allTotalShuffleWrite": deadTotalShuffleWrite
354+
"allTotalShuffleWrite": deadTotalShuffleWrite,
355+
"allTotalBlacklisted": deadTotalBlacklisted
347356
};
348-
357+
349358
var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
350359
$.get(createTemplateURI(appId), function (template) {
351-
360+
352361
executorsSummary.append(Mustache.render($(template).filter("#executors-summary-template").html(), data));
353362
var selector = "#active-executors-table";
354363
var conf = {
@@ -360,7 +369,12 @@ $(document).ready(function () {
360369
}
361370
},
362371
{data: 'hostPort'},
363-
{data: 'isActive', render: formatStatus},
372+
{data: 'isActive', render: function (data, type, row) {
373+
if (type !== 'display') return data;
374+
if (row.isBlacklisted) return "Blacklisted";
375+
else return formatStatus (data, type);
376+
}
377+
},
364378
{data: 'rddBlocks'},
365379
{
366380
data: function (row, type) {
@@ -474,7 +488,8 @@ $(document).ready(function () {
474488
},
475489
{data: 'allTotalInputBytes', render: formatBytes},
476490
{data: 'allTotalShuffleRead', render: formatBytes},
477-
{data: 'allTotalShuffleWrite', render: formatBytes}
491+
{data: 'allTotalShuffleWrite', render: formatBytes},
492+
{data: 'allTotalBlacklisted'}
478493
],
479494
"paging": false,
480495
"searching": false,

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
2121

2222
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2323

24-
import org.apache.spark.SparkConf
24+
import org.apache.spark.{SparkConf, SparkContext}
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.internal.config
2727
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -48,9 +48,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
4848
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
4949
*/
5050
private[scheduler] class BlacklistTracker (
51+
private val listenerBus: LiveListenerBus,
5152
conf: SparkConf,
5253
clock: Clock = new SystemClock()) extends Logging {
5354

55+
def this(sc: SparkContext) = {
56+
this(sc.listenerBus, sc.conf)
57+
}
58+
5459
BlacklistTracker.validateBlacklistConfs(conf)
5560
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
5661
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
@@ -103,6 +108,7 @@ private[scheduler] class BlacklistTracker (
103108
execsToUnblacklist.foreach { exec =>
104109
val status = executorIdToBlacklistStatus.remove(exec).get
105110
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
111+
listenerBus.post(SparkListenerExecutorUnblacklisted(now, exec))
106112
failedExecsOnNode.remove(exec)
107113
if (failedExecsOnNode.isEmpty) {
108114
nodeToBlacklistedExecs.remove(status.node)
@@ -114,7 +120,10 @@ private[scheduler] class BlacklistTracker (
114120
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
115121
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
116122
s"has timed out")
117-
nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
123+
nodesToUnblacklist.foreach { node =>
124+
nodeIdToBlacklistExpiryTime.remove(node)
125+
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
126+
}
118127
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
119128
}
120129
updateNextExpiryTime()
@@ -161,6 +170,8 @@ private[scheduler] class BlacklistTracker (
161170
s" task failures in successful task sets")
162171
val node = failuresInTaskSet.node
163172
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
173+
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
174+
executorIdToFailureList.remove(exec)
164175
updateNextExpiryTime()
165176

166177
// In addition to blacklisting the executor, we also update the data for failures on the
@@ -174,6 +185,7 @@ private[scheduler] class BlacklistTracker (
174185
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
175186
s"executors blacklisted: ${blacklistedExecsOnNode}")
176187
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
188+
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
177189
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
178190
}
179191
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,22 @@ private[spark] class EventLoggingListener(
193193
logEvent(event, flushLogger = true)
194194
}
195195

196+
override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
197+
logEvent(event, flushLogger = true)
198+
}
199+
200+
override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
201+
logEvent(event, flushLogger = true)
202+
}
203+
204+
override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
205+
logEvent(event, flushLogger = true)
206+
}
207+
208+
override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
209+
logEvent(event, flushLogger = true)
210+
}
211+
196212
// No-op because logging every update would be overkill
197213
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
198214

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,28 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
105105
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
106106
extends SparkListenerEvent
107107

108+
@DeveloperApi
109+
case class SparkListenerExecutorBlacklisted(
110+
time: Long,
111+
executorId: String,
112+
taskFailures: Int)
113+
extends SparkListenerEvent
114+
115+
@DeveloperApi
116+
case class SparkListenerExecutorUnblacklisted(time: Long, executorId: String)
117+
extends SparkListenerEvent
118+
119+
@DeveloperApi
120+
case class SparkListenerNodeBlacklisted(
121+
time: Long,
122+
hostId: String,
123+
executorFailures: Int)
124+
extends SparkListenerEvent
125+
126+
@DeveloperApi
127+
case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
128+
extends SparkListenerEvent
129+
108130
@DeveloperApi
109131
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
110132

@@ -238,6 +260,26 @@ private[spark] trait SparkListenerInterface {
238260
*/
239261
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
240262

263+
/**
264+
* Called when the driver blacklists an executor for a Spark application.
265+
*/
266+
def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit
267+
268+
/**
269+
* Called when the driver re-enables a previously blacklisted executor.
270+
*/
271+
def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit
272+
273+
/**
274+
* Called when the driver blacklists a node for a Spark application.
275+
*/
276+
def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit
277+
278+
/**
279+
* Called when the driver re-enables a previously blacklisted node.
280+
*/
281+
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit
282+
241283
/**
242284
* Called when the driver receives a block update info.
243285
*/
@@ -293,6 +335,18 @@ abstract class SparkListener extends SparkListenerInterface {
293335

294336
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
295337

338+
override def onExecutorBlacklisted(
339+
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
340+
341+
override def onExecutorUnblacklisted(
342+
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
343+
344+
override def onNodeBlacklisted(
345+
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
346+
347+
override def onNodeUnblacklisted(
348+
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
349+
296350
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
297351

298352
override def onOtherEvent(event: SparkListenerEvent): Unit = { }

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ private[spark] trait SparkListenerBus
6161
listener.onExecutorAdded(executorAdded)
6262
case executorRemoved: SparkListenerExecutorRemoved =>
6363
listener.onExecutorRemoved(executorRemoved)
64+
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
65+
listener.onExecutorBlacklisted(executorBlacklisted)
66+
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
67+
listener.onExecutorUnblacklisted(executorUnblacklisted)
68+
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
69+
listener.onNodeBlacklisted(nodeBlacklisted)
70+
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
71+
listener.onNodeUnblacklisted(nodeUnblacklisted)
6472
case blockUpdated: SparkListenerBlockUpdated =>
6573
listener.onBlockUpdated(blockUpdated)
6674
case logStart: SparkListenerLogStart => // ignore event log metadata

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
6363
this(
6464
sc,
6565
sc.conf.get(config.MAX_TASK_FAILURES),
66-
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
66+
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
6767
}
6868

6969
def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
7070
this(
7171
sc,
7272
maxTaskFailures,
73-
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
73+
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
7474
isLocal = isLocal)
7575
}
7676

@@ -717,9 +717,9 @@ private[spark] object TaskSchedulerImpl {
717717
retval.toList
718718
}
719719

720-
private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
721-
if (BlacklistTracker.isBlacklistEnabled(conf)) {
722-
Some(new BlacklistTracker(conf))
720+
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
721+
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
722+
Some(new BlacklistTracker(sc))
723723
} else {
724724
None
725725
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class ExecutorSummary private[spark](
7373
val totalInputBytes: Long,
7474
val totalShuffleRead: Long,
7575
val totalShuffleWrite: Long,
76+
val isBlacklisted: Boolean,
7677
val maxMemory: Long,
7778
val executorLogs: Map[String, String])
7879

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ private[spark] object ToolTips {
9191
val TASK_TIME =
9292
"Shaded red when garbage collection (GC) time is over 10% of task time"
9393

94+
val BLACKLISTED =
95+
"Shows if this executor has been blacklisted by the scheduler due to task failures."
96+
9497
val APPLICATION_EXECUTOR_LIMIT =
9598
"""Maximum number of executors that this application will use. This limit is finite only when
9699
dynamic allocation is enabled. The number of granted executors may exceed the limit

0 commit comments

Comments
 (0)