Skip to content

Commit b118b91

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-25875
2 parents 8148f49 + f6cc354 commit b118b91

File tree

45 files changed

+1348
-258
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1348
-258
lines changed

core/src/main/scala/org/apache/spark/Heartbeater.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[spark] class Heartbeater(
6161

6262
/**
6363
* Get the current executor level metrics. These are returned as an array, with the index
64-
* determined by MetricGetter.values
64+
* determined by ExecutorMetricType.values
6565
*/
6666
def getCurrentMetrics(): ExecutorMetrics = {
6767
val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.metrics.ExecutorMetricType
2828
@DeveloperApi
2929
class ExecutorMetrics private[spark] extends Serializable {
3030

31-
// Metrics are indexed by MetricGetter.values
31+
// Metrics are indexed by ExecutorMetricType.values
3232
private val metrics = new Array[Long](ExecutorMetricType.values.length)
3333

3434
// the first element is initialized to -1, indicating that the values for the array

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private[spark] class DAGScheduler(
265265
// (taskId, stageId, stageAttemptId, accumUpdates)
266266
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
267267
blockManagerId: BlockManagerId,
268-
// executor metrics indexed by MetricGetter.values
268+
// executor metrics indexed by ExecutorMetricType.values
269269
executorUpdates: ExecutorMetrics): Boolean = {
270270
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
271271
Some(executorUpdates)))
@@ -1295,6 +1295,27 @@ private[spark] class DAGScheduler(
12951295
Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics))
12961296
}
12971297

1298+
/**
1299+
* Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to see if we should
1300+
* interrupt running tasks. Returns `false` if the property value is not a boolean value
1301+
*/
1302+
private def shouldInterruptTaskThread(job: ActiveJob): Boolean = {
1303+
if (job.properties == null) {
1304+
false
1305+
} else {
1306+
val shouldInterruptThread =
1307+
job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
1308+
try {
1309+
shouldInterruptThread.toBoolean
1310+
} catch {
1311+
case e: IllegalArgumentException =>
1312+
logWarning(s"${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} in Job ${job.jobId} " +
1313+
s"is invalid: $shouldInterruptThread. Using 'false' instead", e)
1314+
false
1315+
}
1316+
}
1317+
}
1318+
12981319
/**
12991320
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
13001321
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
13641385
if (job.numFinished == job.numPartitions) {
13651386
markStageAsFinished(resultStage)
13661387
cleanupStateForJobAndIndependentStages(job)
1388+
try {
1389+
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
1390+
// killTask.
1391+
logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
1392+
"or zombie tasks for this job")
1393+
// ResultStage is only used by this job. It's safe to kill speculative or
1394+
// zombie tasks in this stage.
1395+
taskScheduler.killAllTaskAttempts(
1396+
stageId,
1397+
shouldInterruptTaskThread(job),
1398+
reason = "Stage finished")
1399+
} catch {
1400+
case e: UnsupportedOperationException =>
1401+
logWarning(s"Could not cancel tasks for stage $stageId", e)
1402+
}
13671403
listenerBus.post(
13681404
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
13691405
}
@@ -1373,7 +1409,7 @@ private[spark] class DAGScheduler(
13731409
try {
13741410
job.listener.taskSucceeded(rt.outputId, event.result)
13751411
} catch {
1376-
case e: Exception =>
1412+
case e: Throwable if !Utils.isFatalError(e) =>
13771413
// TODO: Perhaps we want to mark the resultStage as failed?
13781414
job.listener.jobFailed(new SparkDriverExecutionException(e))
13791415
}
@@ -1890,10 +1926,6 @@ private[spark] class DAGScheduler(
18901926
val error = new SparkException(failureReason, exception.getOrElse(null))
18911927
var ableToCancelStages = true
18921928

1893-
val shouldInterruptThread =
1894-
if (job.properties == null) false
1895-
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
1896-
18971929
// Cancel all independent, running stages.
18981930
val stages = jobIdToStageIds(job.jobId)
18991931
if (stages.isEmpty) {
@@ -1913,12 +1945,12 @@ private[spark] class DAGScheduler(
19131945
val stage = stageIdToStage(stageId)
19141946
if (runningStages.contains(stage)) {
19151947
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
1916-
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
1948+
taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job))
19171949
markStageAsFinished(stage, Some(failureReason))
19181950
} catch {
19191951
case e: UnsupportedOperationException =>
1920-
logInfo(s"Could not cancel tasks for stage $stageId", e)
1921-
ableToCancelStages = false
1952+
logWarning(s"Could not cancel tasks for stage $stageId", e)
1953+
ableToCancelStages = false
19221954
}
19231955
}
19241956
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ case class SparkListenerExecutorMetricsUpdate(
175175
* @param execId executor id
176176
* @param stageId stage id
177177
* @param stageAttemptId stage attempt
178-
* @param executorMetrics executor level metrics, indexed by MetricGetter.values
178+
* @param executorMetrics executor level metrics, indexed by ExecutorMetricType.values
179179
*/
180180
@DeveloperApi
181181
case class SparkListenerStageExecutorMetrics(

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFor
3333
import org.scalatest.Matchers._
3434
import org.scalatest.concurrent.Eventually
3535

36-
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
36+
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES
37+
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
38+
import org.apache.spark.shuffle.FetchFailedException
3739
import org.apache.spark.util.{ThreadUtils, Utils}
3840

3941

@@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
672674
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
673675
}
674676
}
677+
678+
test("cancel zombie tasks in a result stage when the job finishes") {
679+
val conf = new SparkConf()
680+
.setMaster("local-cluster[1,2,1024]")
681+
.setAppName("test-cluster")
682+
.set("spark.ui.enabled", "false")
683+
// Disable this so that if a task is running, we can make sure the executor will always send
684+
// task metrics via heartbeat to driver.
685+
.set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
686+
// Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast
687+
.set("spark.executor.heartbeatInterval", "1s")
688+
sc = new SparkContext(conf)
689+
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
690+
@volatile var runningTaskIds: Seq[Long] = null
691+
val listener = new SparkListener {
692+
override def onExecutorMetricsUpdate(
693+
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
694+
if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) {
695+
runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
696+
}
697+
}
698+
}
699+
sc.addSparkListener(listener)
700+
sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
701+
val context = org.apache.spark.TaskContext.get()
702+
if (context.stageAttemptNumber == 0) {
703+
if (context.partitionId == 0) {
704+
// Make the first task in the first stage attempt fail.
705+
throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
706+
new java.io.IOException("fake"))
707+
} else {
708+
// Make the second task in the first stage attempt sleep to generate a zombie task
709+
Thread.sleep(60000)
710+
}
711+
} else {
712+
// Make the second stage attempt successful.
713+
}
714+
x
715+
}.collect()
716+
sc.listenerBus.waitUntilEmpty(10000)
717+
// As executors will send the metrics of running tasks via heartbeat, we can use this to check
718+
// whether there is any running task.
719+
eventually(timeout(10.seconds)) {
720+
// Make sure runningTaskIds has been set
721+
assert(runningTaskIds != null)
722+
// Verify there is no running task.
723+
assert(runningTaskIds.isEmpty)
724+
}
725+
}
675726
}
676727

677728
object SparkContextSuite {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,27 +1901,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
19011901
}
19021902

19031903
/**
1904-
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
1904+
* The job will be failed on first task throwing an error.
19051905
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
19061906
* If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions
19071907
* and their differing causes as to which will represent result for job...
19081908
*/
19091909
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
1910-
val e = intercept[SparkDriverExecutionException] {
1911-
// Number of parallelized partitions implies number of tasks of job
1912-
val rdd = sc.parallelize(1 to 10, 2)
1913-
sc.runJob[Int, Int](
1914-
rdd,
1915-
(context: TaskContext, iter: Iterator[Int]) => iter.size,
1916-
// For a robust test assertion, limit number of job tasks to 1; that is,
1917-
// if multiple RDD partitions, use id of any one partition, say, first partition id=0
1918-
Seq(0),
1919-
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
1910+
failAfter(1.minute) { // If DAGScheduler crashes, the following test will hang forever
1911+
for (error <- Seq(
1912+
new DAGSchedulerSuiteDummyException,
1913+
new AssertionError, // E.g., assert(foo == bar) fails
1914+
new NotImplementedError // E.g., call a method with `???` implementation.
1915+
)) {
1916+
val e = intercept[SparkDriverExecutionException] {
1917+
// Number of parallelized partitions implies number of tasks of job
1918+
val rdd = sc.parallelize(1 to 10, 2)
1919+
sc.runJob[Int, Int](
1920+
rdd,
1921+
(context: TaskContext, iter: Iterator[Int]) => iter.size,
1922+
// For a robust test assertion, limit number of job tasks to 1; that is,
1923+
// if multiple RDD partitions, use id of any one partition, say, first partition id=0
1924+
Seq(0),
1925+
(part: Int, result: Int) => throw error)
1926+
}
1927+
assert(e.getCause eq error)
1928+
1929+
// Make sure we can still run commands on our SparkContext
1930+
assert(sc.parallelize(1 to 10, 2).count() === 10)
1931+
}
19201932
}
1921-
assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
1933+
}
19221934

1923-
// Make sure we can still run commands on our SparkContext
1924-
assert(sc.parallelize(1 to 10, 2).count() === 10)
1935+
test(s"invalid ${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} should not crash DAGScheduler") {
1936+
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "invalid")
1937+
try {
1938+
intercept[SparkException] {
1939+
sc.parallelize(1 to 1, 1).foreach { _ =>
1940+
throw new DAGSchedulerSuiteDummyException
1941+
}
1942+
}
1943+
// Verify the above job didn't crash DAGScheduler by running a simple job
1944+
assert(sc.parallelize(1 to 10, 2).count() === 10)
1945+
} finally {
1946+
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
1947+
}
19251948
}
19261949

19271950
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {

dev/create-release/release-build.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ if [[ "$1" == "package" ]]; then
326326
svn add "svn-spark/${DEST_DIR_NAME}-bin"
327327

328328
cd svn-spark
329-
svn ci --username $ASF_USERNAME --password "$ASF_PASSWORD" -m"Apache Spark $SPARK_PACKAGE_VERSION"
329+
svn ci --username $ASF_USERNAME --password "$ASF_PASSWORD" -m"Apache Spark $SPARK_PACKAGE_VERSION" --no-auth-cache
330330
cd ..
331331
rm -rf svn-spark
332332
fi
@@ -354,7 +354,7 @@ if [[ "$1" == "docs" ]]; then
354354
svn add "svn-spark/${DEST_DIR_NAME}-docs"
355355

356356
cd svn-spark
357-
svn ci --username $ASF_USERNAME --password "$ASF_PASSWORD" -m"Apache Spark $SPARK_PACKAGE_VERSION docs"
357+
svn ci --username $ASF_USERNAME --password "$ASF_PASSWORD" -m"Apache Spark $SPARK_PACKAGE_VERSION docs" --no-auth-cache
358358
cd ..
359359
rm -rf svn-spark
360360
fi

0 commit comments

Comments
 (0)