Skip to content

Commit d233431

Browse files
committed
[SPARK-16925] Master should call schedule() after all executor exit events, not only failures
## What changes were proposed in this pull request? This patch fixes a bug in Spark's standalone Master which could cause applications to hang if tasks cause executors to exit with zero exit codes. As an example of the bug, run ``` sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } ``` on a standalone cluster which has a single Spark application. This will cause all executors to die but those executors won't be replaced unless another Spark application or worker joins or leaves the cluster (or if an executor exits with a non-zero exit code). This behavior is caused by a bug in how the Master handles the `ExecutorStateChanged` event: the current implementation calls `schedule()` only if the executor exited with a non-zero exit code, so a task which causes a JVM to unexpectedly exit "cleanly" will skip the `schedule()` call. This patch addresses this by modifying the `ExecutorStateChanged` to always unconditionally call `schedule()`. This should be safe because it should always be safe to call `schedule()`; adding extra `schedule()` calls can only affect performance and should not introduce correctness bugs. ## How was this patch tested? I added a regression test in `DistributedSuite`. Author: Josh Rosen <[email protected]> Closes #14510 from JoshRosen/SPARK-16925. (cherry picked from commit 4f5f9b6) Signed-off-by: Josh Rosen <[email protected]>
1 parent 90e0460 commit d233431

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,19 +265,16 @@ private[deploy] class Master(
265265

266266
val normalExit = exitStatus == Some(0)
267267
// Only retry certain number of times so we don't go into an infinite loop.
268-
if (!normalExit) {
269-
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
270-
schedule()
271-
} else {
272-
val execs = appInfo.executors.values
273-
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
274-
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
275-
s"${appInfo.retryCount} times; removing it")
276-
removeApplication(appInfo, ApplicationState.FAILED)
277-
}
268+
if (!normalExit && appInfo.incrementRetryCount() >= ApplicationState.MAX_NUM_RETRY) {
269+
val execs = appInfo.executors.values
270+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
271+
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
272+
s"${appInfo.retryCount} times; removing it")
273+
removeApplication(appInfo, ApplicationState.FAILED)
278274
}
279275
}
280276
}
277+
schedule()
281278
case None =>
282279
logWarning(s"Got status update for unknown executor $appId/$execId")
283280
}

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
134134
}
135135
}
136136

137+
test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") {
138+
// Ensures that if a task which causes the JVM to exit with a zero exit code will cause the
139+
// Spark job to eventually fail.
140+
sc = new SparkContext(clusterUrl, "test")
141+
failAfter(Span(100000, Millis)) {
142+
val thrown = intercept[SparkException] {
143+
sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
144+
}
145+
assert(thrown.getClass === classOf[SparkException])
146+
assert(thrown.getMessage.contains("failed 4 times"))
147+
}
148+
// Check that the cluster is still usable:
149+
sc.parallelize(1 to 10).count()
150+
}
151+
137152
test("caching") {
138153
sc = new SparkContext(clusterUrl, "test")
139154
val data = sc.parallelize(1 to 1000, 10).cache()

0 commit comments

Comments
 (0)