Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1245,9 +1245,10 @@ private[spark] class DAGScheduler(
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
try {
event.accumUpdates.foreach { updates =>
val id = updates.id

event.accumUpdates.foreach { updates =>
val id = updates.id
try {
// Find the corresponding accumulator on the driver and update it
val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
Expand All @@ -1261,10 +1262,17 @@ private[spark] class DAGScheduler(
event.taskInfo.setAccumulables(
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
}
} catch {
case NonFatal(e) =>
// Log the class name to make it easy to find the bad implementation
val accumClassName = AccumulatorContext.get(id) match {
case Some(accum) => accum.getClass.getName
case None => "Unknown class"
}
logError(
s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}",
e)
}
} catch {
case NonFatal(e) =>
logError(s"Failed to update accumulators for task ${task.partitionId}", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(sc.parallelize(1 to 10, 2).count() === 10)
}

test("misbehaved accumulator should not impact other accumulators") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also verify the log message?

That's not in the core project.

val bad = new LongAccumulator {
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = {
throw new DAGSchedulerSuiteDummyException
}
}
sc.register(bad, "bad")
val good = sc.longAccumulator("good")

sc.parallelize(1 to 10, 2).foreach { item =>
bad.add(1)
good.add(1)
}

// This is to ensure the `bad` accumulator did fail to update its value
assert(bad.value == 0L)
// Should be able to update the "good" accumulator
assert(good.value == 10L)
}

/**
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
Expand Down
4 changes: 4 additions & 0 deletions docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1");

Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.

*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator.
If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence,
a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.

</div>

<div data-lang="python" markdown="1">
Expand Down