Skip to content

Commit 8de0d36

Browse files
JoshRosenzzcclp
authored andcommitted
[SPARK-16925] Master should call schedule() after all executor exit events, not only failures
This patch fixes a bug in Spark's standalone Master which could cause applications to hang if tasks cause executors to exit with zero exit codes. As an example of the bug, run ``` sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } ``` on a standalone cluster which has a single Spark application. This will cause all executors to die but those executors won't be replaced unless another Spark application or worker joins or leaves the cluster (or if an executor exits with a non-zero exit code). This behavior is caused by a bug in how the Master handles the `ExecutorStateChanged` event: the current implementation calls `schedule()` only if the executor exited with a non-zero exit code, so a task which causes a JVM to unexpectedly exit "cleanly" will skip the `schedule()` call. This patch addresses this by modifying the `ExecutorStateChanged` to always unconditionally call `schedule()`. This should be safe because it should always be safe to call `schedule()`; adding extra `schedule()` calls can only affect performance and should not introduce correctness bugs. I added a regression test in `DistributedSuite`. Author: Josh Rosen <[email protected]> Closes apache#14510 from JoshRosen/SPARK-16925. (cherry picked from commit 4f5f9b6) Signed-off-by: Josh Rosen <[email protected]> (cherry picked from commit c162886)
1 parent 083d2d5 commit 8de0d36

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -284,19 +284,16 @@ private[deploy] class Master(
284284

285285
val normalExit = exitStatus == Some(0)
286286
// Only retry certain number of times so we don't go into an infinite loop.
287-
if (!normalExit) {
288-
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
289-
schedule()
290-
} else {
291-
val execs = appInfo.executors.values
292-
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
293-
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
294-
s"${appInfo.retryCount} times; removing it")
295-
removeApplication(appInfo, ApplicationState.FAILED)
296-
}
287+
if (!normalExit && appInfo.incrementRetryCount() >= ApplicationState.MAX_NUM_RETRY) {
288+
val execs = appInfo.executors.values
289+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
290+
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
291+
s"${appInfo.retryCount} times; removing it")
292+
removeApplication(appInfo, ApplicationState.FAILED)
297293
}
298294
}
299295
}
296+
schedule()
300297
}
301298
case None =>
302299
logWarning(s"Got status update for unknown executor $appId/$execId")

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
131131
}
132132
}
133133

134+
test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") {
135+
// Ensures that if a task which causes the JVM to exit with a zero exit code will cause the
136+
// Spark job to eventually fail.
137+
sc = new SparkContext(clusterUrl, "test")
138+
failAfter(Span(100000, Millis)) {
139+
val thrown = intercept[SparkException] {
140+
sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
141+
}
142+
assert(thrown.getClass === classOf[SparkException])
143+
assert(thrown.getMessage.contains("failed 4 times"))
144+
}
145+
// Check that the cluster is still usable:
146+
sc.parallelize(1 to 10).count()
147+
}
148+
134149
test("caching") {
135150
sc = new SparkContext(clusterUrl, "test")
136151
val data = sc.parallelize(1 to 1000, 10).cache()

0 commit comments

Comments
 (0)