Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved
onEvent(executorRemoved);
}

@Override
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
title="Bytes and records written to disk in order to be read by a shuffle in a future stage.">
Shuffle Write</span>
</th>
<th>
<span data-toggle="tooltip" data-placement="left"
title="Number of executors blacklisted by the scheduler due to task failures.">
Blacklisted</span>
</th>
</thead>
<tbody>
</tbody>
Expand Down
39 changes: 27 additions & 12 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ $(document).ready(function () {
executorsSummary = $("#active-executors");

getStandAloneppId(function (appId) {

var endPoint = createRESTEndPoint(appId);
$.getJSON(endPoint, function (response, status, jqXHR) {
var summary = [];
Expand All @@ -202,7 +202,8 @@ $(document).ready(function () {
var allTotalInputBytes = 0;
var allTotalShuffleRead = 0;
var allTotalShuffleWrite = 0;

var allTotalBlacklisted = 0;

var activeExecCnt = 0;
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
Expand All @@ -219,7 +220,8 @@ $(document).ready(function () {
var activeTotalInputBytes = 0;
var activeTotalShuffleRead = 0;
var activeTotalShuffleWrite = 0;

var activeTotalBlacklisted = 0;

var deadExecCnt = 0;
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
Expand All @@ -236,7 +238,8 @@ $(document).ready(function () {
var deadTotalInputBytes = 0;
var deadTotalShuffleRead = 0;
var deadTotalShuffleWrite = 0;

var deadTotalBlacklisted = 0;

response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
Expand All @@ -254,6 +257,7 @@ $(document).ready(function () {
allTotalInputBytes += exec.totalInputBytes;
allTotalShuffleRead += exec.totalShuffleRead;
allTotalShuffleWrite += exec.totalShuffleWrite;
allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
if (exec.isActive) {
activeExecCnt += 1;
activeRDDBlocks += exec.rddBlocks;
Expand All @@ -271,6 +275,7 @@ $(document).ready(function () {
activeTotalInputBytes += exec.totalInputBytes;
activeTotalShuffleRead += exec.totalShuffleRead;
activeTotalShuffleWrite += exec.totalShuffleWrite;
activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
} else {
deadExecCnt += 1;
deadRDDBlocks += exec.rddBlocks;
Expand All @@ -288,9 +293,10 @@ $(document).ready(function () {
deadTotalInputBytes += exec.totalInputBytes;
deadTotalShuffleRead += exec.totalShuffleRead;
deadTotalShuffleWrite += exec.totalShuffleWrite;
deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
}
});

var totalSummary = {
"execCnt": ( "Total(" + allExecCnt + ")"),
"allRDDBlocks": allRDDBlocks,
Expand All @@ -307,7 +313,8 @@ $(document).ready(function () {
"allTotalGCTime": allTotalGCTime,
"allTotalInputBytes": allTotalInputBytes,
"allTotalShuffleRead": allTotalShuffleRead,
"allTotalShuffleWrite": allTotalShuffleWrite
"allTotalShuffleWrite": allTotalShuffleWrite,
"allTotalBlacklisted": allTotalBlacklisted
};
var activeSummary = {
"execCnt": ( "Active(" + activeExecCnt + ")"),
Expand All @@ -325,7 +332,8 @@ $(document).ready(function () {
"allTotalGCTime": activeTotalGCTime,
"allTotalInputBytes": activeTotalInputBytes,
"allTotalShuffleRead": activeTotalShuffleRead,
"allTotalShuffleWrite": activeTotalShuffleWrite
"allTotalShuffleWrite": activeTotalShuffleWrite,
"allTotalBlacklisted": activeTotalBlacklisted
};
var deadSummary = {
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
Expand All @@ -343,12 +351,13 @@ $(document).ready(function () {
"allTotalGCTime": deadTotalGCTime,
"allTotalInputBytes": deadTotalInputBytes,
"allTotalShuffleRead": deadTotalShuffleRead,
"allTotalShuffleWrite": deadTotalShuffleWrite
"allTotalShuffleWrite": deadTotalShuffleWrite,
"allTotalBlacklisted": deadTotalBlacklisted
};

var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
$.get(createTemplateURI(appId), function (template) {

executorsSummary.append(Mustache.render($(template).filter("#executors-summary-template").html(), data));
var selector = "#active-executors-table";
var conf = {
Expand All @@ -360,7 +369,12 @@ $(document).ready(function () {
}
},
{data: 'hostPort'},
{data: 'isActive', render: formatStatus},
{data: 'isActive', render: function (data, type, row) {
if (type !== 'display') return data;
if (row.isBlacklisted) return "Blacklisted";
else return formatStatus (data, type);
}
},
{data: 'rddBlocks'},
{
data: function (row, type) {
Expand Down Expand Up @@ -477,7 +491,8 @@ $(document).ready(function () {
},
{data: 'allTotalInputBytes', render: formatBytes},
{data: 'allTotalShuffleRead', render: formatBytes},
{data: 'allTotalShuffleWrite', render: formatBytes}
{data: 'allTotalShuffleWrite', render: formatBytes},
{data: 'allTotalBlacklisted'}
],
"paging": false,
"searching": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand All @@ -48,9 +48,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
*/
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
clock: Clock = new SystemClock()) extends Logging {

def this(sc: SparkContext) = {
this(sc.listenerBus, sc.getConf)
}

BlacklistTracker.validateBlacklistConfs(conf)
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
Expand Down Expand Up @@ -103,6 +108,7 @@ private[scheduler] class BlacklistTracker (
execsToUnblacklist.foreach { exec =>
val status = executorIdToBlacklistStatus.remove(exec).get
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
listenerBus.post(SparkListenerExecutorUnblacklisted(now, exec))
failedExecsOnNode.remove(exec)
if (failedExecsOnNode.isEmpty) {
nodeToBlacklistedExecs.remove(status.node)
Expand All @@ -115,6 +121,10 @@ private[scheduler] class BlacklistTracker (
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
s"has timed out")
nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
nodesToUnblacklist.foreach { node =>
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
updateNextExpiryTime()
Expand Down Expand Up @@ -161,6 +171,9 @@ private[scheduler] class BlacklistTracker (
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTime))
listenerBus.post(
SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()

// In addition to blacklisting the executor, we also update the data for failures on the
Expand All @@ -171,6 +184,7 @@ private[scheduler] class BlacklistTracker (
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTime)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}

override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
}

// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}

Expand Down
54 changes: 54 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorBlacklisted(
time: Long,
executorId: String,
taskFailures: Int)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorUnblacklisted(time: Long, executorId: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerNodeBlacklisted(
time: Long,
nodeId: String,
executorFailures: Int)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerNodeUnblacklisted(time: Long, nodeId: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent

Expand Down Expand Up @@ -238,6 +260,26 @@ private[spark] trait SparkListenerInterface {
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit

/**
* Called when the driver blacklists an executor for a Spark application.
*/
def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit

/**
* Called when the driver re-enables a previously blacklisted executor.
*/
def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit

/**
* Called when the driver blacklists a node for a Spark application.
*/
def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit

/**
* Called when the driver re-enables a previously blacklisted node.
*/
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit

/**
* Called when the driver receives a block update info.
*/
Expand Down Expand Up @@ -293,6 +335,18 @@ abstract class SparkListener extends SparkListenerInterface {

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

override def onExecutorUnblacklisted(
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

override def onNodeBlacklisted(
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ private[spark] trait SparkListenerBus
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}

def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
this(
sc,
maxTaskFailures,
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}

Expand Down Expand Up @@ -701,9 +701,9 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}

private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(conf)) {
Some(new BlacklistTracker(conf))
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
Some(new BlacklistTracker(sc))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ExecutorSummary private[spark](
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ private[spark] object ToolTips {
val TASK_TIME =
"Shaded red when garbage collection (GC) time is over 10% of task time"

val BLACKLISTED =
"Shows if this executor has been blacklisted by the scheduler due to task failures."

val APPLICATION_EXECUTOR_LIMIT =
"""Maximum number of executors that this application will use. This limit is finite only when
dynamic allocation is enabled. The number of granted executors may exceed the limit
Expand Down
Loading