Skip to content

Commit a001482

Browse files
committed
[SPARK-46480][CORE][SQL][3.5] Fix NPE when table cache task attempt
This pr backports #44445 for branch-3.5 ### What changes were proposed in this pull request? This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`. ### Why are the changes needed? Before this pr, when do cache, task failure can cause NPE in other tasks ``` java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:396) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ``` ### Does this PR introduce _any_ user-facing change? yes, it's a bug fix ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44457 from ulysses-you/fix-cache-3.5. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]>
1 parent 98042e3 commit a001482

File tree

6 files changed

+27
-4
lines changed

6 files changed

+27
-4
lines changed

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ class BarrierTaskContext private[spark] (
193193

194194
override def isCompleted(): Boolean = taskContext.isCompleted()
195195

196+
override def isFailed(): Boolean = taskContext.isFailed()
197+
196198
override def isInterrupted(): Boolean = taskContext.isInterrupted()
197199

198200
override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
9494
*/
9595
def isCompleted(): Boolean
9696

97+
/**
98+
* Returns true if the task has failed.
99+
*/
100+
def isFailed(): Boolean
101+
97102
/**
98103
* Returns true if the task has been killed.
99104
*/

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
275275
@GuardedBy("this")
276276
override def isCompleted(): Boolean = synchronized(completed)
277277

278+
override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)
279+
278280
override def isInterrupted(): Boolean = reasonIfKilled.isDefined
279281

280282
override def getLocalProperty(key: String): String = localProperties.getProperty(key)

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,16 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
669669
assert(invocationOrder === Seq("C", "B", "A", "D"))
670670
}
671671

672+
test("SPARK-46480: Add isFailed in TaskContext") {
673+
val context = TaskContext.empty()
674+
var isFailed = false
675+
context.addTaskCompletionListener[Unit] { context =>
676+
isFailed = context.isFailed()
677+
}
678+
context.markTaskFailed(new RuntimeException())
679+
context.markTaskCompleted(None)
680+
assert(isFailed)
681+
}
672682
}
673683

674684
private object TaskContextSuite {

project/MimaExcludes.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ object MimaExcludes {
7272
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction"),
7373
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapGroupsWithStateFunction"),
7474
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SaveMode"),
75-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState")
75+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState"),
76+
// [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
77+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed")
7678
)
7779

7880
// Default exclude rules

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,11 @@ case class CachedRDDBuilder(
279279
cachedPlan.conf)
280280
}
281281
val cached = cb.mapPartitionsInternal { it =>
282-
TaskContext.get().addTaskCompletionListener[Unit](_ => {
283-
materializedPartitions.add(1L)
284-
})
282+
TaskContext.get().addTaskCompletionListener[Unit] { context =>
283+
if (!context.isFailed() && !context.isInterrupted()) {
284+
materializedPartitions.add(1L)
285+
}
286+
}
285287
new Iterator[CachedBatch] {
286288
override def hasNext: Boolean = it.hasNext
287289
override def next(): CachedBatch = {

0 commit comments

Comments
 (0)