Skip to content

Commit 1e61ff4

Browse files
jasonmoore2kAndrew Or
authored andcommitted
[SPARK-14357][CORE] Properly handle the root cause being a commit denied exception
## What changes were proposed in this pull request? When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception. ## How was this patch tested? Added a test suite for the component that extracts the root cause of the error. Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException. Author: Jason Moore <[email protected]> Closes #12228 from jasonmoore2k/SPARK-14357. (cherry picked from commit 22014e6) Signed-off-by: Andrew Or <[email protected]>
1 parent 4f9d1f8 commit 1e61ff4

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private[spark] class Executor(
287287
logInfo(s"Executor killed $taskName (TID $taskId)")
288288
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
289289

290-
case cDE: CommitDeniedException =>
290+
case CausedBy(cDE: CommitDeniedException) =>
291291
val reason = cDE.toTaskEndReason
292292
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
293293

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
/**
21+
* Extractor Object for pulling out the root cause of an error.
22+
* If the error contains no cause, it will return the error itself.
23+
*
24+
* Usage:
25+
* try {
26+
* ...
27+
* } catch {
28+
* case CausedBy(ex: CommitDeniedException) => ...
29+
* }
30+
*/
31+
private[spark] object CausedBy {
32+
33+
def unapply(e: Throwable): Option[Throwable] = {
34+
Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
35+
}
36+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import org.apache.spark.SparkFunSuite
21+
22+
class CausedBySuite extends SparkFunSuite {
23+
24+
test("For an error without a cause, should return the error") {
25+
val error = new Exception
26+
27+
val causedBy = error match {
28+
case CausedBy(e) => e
29+
}
30+
31+
assert(causedBy === error)
32+
}
33+
34+
test("For an error with a cause, should return the cause of the error") {
35+
val cause = new Exception
36+
val error = new Exception(cause)
37+
38+
val causedBy = error match {
39+
case CausedBy(e) => e
40+
}
41+
42+
assert(causedBy === cause)
43+
}
44+
45+
test("For an error with a cause that itself has a cause, return the root cause") {
46+
val causeOfCause = new Exception
47+
val cause = new Exception(causeOfCause)
48+
val error = new Exception(cause)
49+
50+
val causedBy = error match {
51+
case CausedBy(e) => e
52+
}
53+
54+
assert(causedBy === causeOfCause)
55+
}
56+
}

0 commit comments

Comments
 (0)