From 664e06de21fbd6f38ffe7227471cfe429fdb019e Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 12 Dec 2023 15:44:08 -0600 Subject: [PATCH 01/15] fix --- .../spark/internal/config/package.scala | 10 +++ .../org/apache/spark/scheduler/TaskInfo.scala | 37 +++++++++- .../spark/scheduler/TaskSetManager.scala | 59 +++++++++++++-- .../spark/scheduler/SparkListenerSuite.scala | 34 +++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 73 +++++++++++++++++++ 5 files changed, 205 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2823b7cdb602..2349a3e5d1ef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2620,4 +2620,14 @@ package object config { .stringConf .toSequence .createWithDefault("org.apache.spark.sql.connect.client" :: Nil) + + private[spark] val DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION = + ConfigBuilder("spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled") + .internal() + .doc("If true, the task info accumulables will be cleared upon task completion in " + + "TaskSetManager. This reduces the heap usage of the driver by only referencing the " + + "task info accumulables for the active tasks and not for completed tasks.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 2d4624828a94..2c237ce275da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -45,7 +45,7 @@ class TaskInfo( val executorId: String, val host: String, val taskLocality: TaskLocality.TaskLocality, - val speculative: Boolean) { + val speculative: Boolean) extends Cloneable { /** * This api doesn't contains partitionId, please use the new api. @@ -75,7 +75,13 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - def accumulables: Seq[AccumulableInfo] = _accumulables + def accumulables: Seq[AccumulableInfo] = { + if (throwOnAccumulablesCall) { + throw new IllegalStateException("Accumulables for the TaskInfo have been cleared") + } else { + _accumulables + } + } private[this] var _accumulables: Seq[AccumulableInfo] = Nil @@ -83,6 +89,33 @@ class TaskInfo( _accumulables = newAccumulables } + /** + * If true, a call to TaskInfo.accumulables() will throw an exception. + */ + private var throwOnAccumulablesCall: Boolean = false + + override def clone(): TaskInfo = super.clone().asInstanceOf[TaskInfo] + + /** + * For testing only. Allows probing accumulables without triggering the exception when + * `throwOnAccumulablesCall` is set. + */ + private[scheduler] def isAccumulablesEmpty(): Boolean = { + _accumulables.isEmpty + } + + private[scheduler] def resetAccumulables(): Unit = { + setAccumulables(Nil) + throwOnAccumulablesCall = true + } + + private[scheduler] def cloneWithEmptyAccumulables(): TaskInfo = { + val cloned = clone() + cloned.setAccumulables(Nil) + cloned.throwOnAccumulablesCall = true + cloned + } + /** * The time when the task has completed successfully (including the time to remotely fetch * results, if necessary). diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d17e6735c4ec..894eaac9d699 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -787,6 +787,8 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause successful and tasksSuccessful wrong result. if(info.finished) { + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime. + info.resetAccumulables() return } val index = info.index @@ -804,6 +806,8 @@ private[spark] class TaskSetManager( // Handle this task as a killed task handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) + // SPARK-46383: Not clearing the accumulables here because they are already cleared in + // handleFailedTask. return } @@ -846,11 +850,49 @@ private[spark] class TaskSetManager( // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. - sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, - result.metricPeaks, info) + + emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(), + result.accumUpdates, result.metricPeaks, info) maybeFinishTaskSet() } + /** + * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the + * TaskInfo object, corresponding to the completed task, referenced by this class. + * + * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only + * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo + * object that corresponds to the completed task. + * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo + * objects held by this class are long-lived and have a heavy memory footprint on the driver. + * + * This is safe as the TaskInfo accumulables are not needed once they are shipped to the + * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a + * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the + * DAGScheduler on multiple events during the task's lifetime. Users can install + * SparkListeners that compare the TaskInfo objects across these SparkListener events and + * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo + * object. + */ + private def emptyTaskInfoAccumulablesAndNotifyDagScheduler( + taskId: Long, + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + metricPeaks: Array[Long], + taskInfo: TaskInfo): Unit = { + val index = taskInfo.index + if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) { + val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables() + // Update this task's taskInfo while preserving its position in the list + taskAttempts(index) = + taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i } + taskInfos(taskId) = clonedTaskInfo + } + sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo) + } + private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { @@ -874,6 +916,8 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause copiesRunning wrong result. if (info.finished) { + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime. + info.resetAccumulables() return } removeRunningTask(tid) @@ -908,7 +952,8 @@ private[spark] class TaskSetManager( if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. logError(s"$task had a not serializable result: ${ef.description}; not retrying") - sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) + emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, + accumUpdates, metricPeaks, info) abort(s"$task had a not serializable result: ${ef.description}") return } @@ -917,7 +962,8 @@ private[spark] class TaskSetManager( // re-execute it. logError("Task %s in stage %s (TID %d) can not write to output file: %s; not retrying" .format(info.id, taskSet.id, tid, ef.description)) - sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) + emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, + accumUpdates, metricPeaks, info) abort("Task %s in stage %s (TID %d) can not write to output file: %s".format( info.id, taskSet.id, tid, ef.description)) return @@ -970,7 +1016,8 @@ private[spark] class TaskSetManager( isZombie = true } - sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) + emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, + accumUpdates, metricPeaks, info) if (!isZombie && reason.countTowardsTaskFailures) { assert (null != failureReason) @@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager( addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. - sched.dagScheduler.taskEnded( + emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 29e27e96908f..031bcb4bfaf0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.util.{Collections, IdentityHashMap} import java.util.concurrent.Semaphore import scala.collection.mutable @@ -289,6 +290,16 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true} } + test("SPARK-46383: Track TaskInfo objects") { + sc = new SparkContext("local", "SparkListenerSuite") + val listener = new SaveActiveTaskInfos + sc.addSparkListener(listener) + val rdd1 = sc.parallelize(1 to 100, 4) + sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1)) + sc.listenerBus.waitUntilEmpty() + listener.taskInfos.size should be { 0 } + } + test("local metrics") { sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo @@ -643,6 +654,29 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + /** + * A simple listener that tracks task infos for all active tasks. + */ + private class SaveActiveTaskInfos extends SparkListener { + // Use a set based on IdentityHashMap instead of a HashSet to track unique references of + // TaskInfo objects. + val taskInfos = Collections.newSetFromMap[TaskInfo](new IdentityHashMap) + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val info = taskStart.taskInfo + if (info != null) { + taskInfos.add(info) + } + } + + override def onTaskEnd(task: SparkListenerTaskEnd): Unit = { + val info = task.taskInfo + if (info != null && taskInfos.contains(info)) { + taskInfos.remove(info) + } + } + } + /** * A simple listener that saves the task indices for all task events. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2f8b6df8beac..22c91877c96d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) accumUpdates: Seq[AccumulatorV2[_, _]], metricPeaks: Array[Long], taskInfo: TaskInfo): Unit = { + accumUpdates.foreach(acc => + taskInfo.setAccumulables( + acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables) + ) + taskScheduler.endedTasks(taskInfo.index) = reason taskScheduler.endedTasks(taskInfo.index) = reason } @@ -229,6 +234,74 @@ class TaskSetManagerSuite super.afterEach() } + test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " + + "accessed once the task has completed") { conf => + sc = new SparkContext("local", "test", conf) + sched = FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(1) + val clock = new ManualClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdates = taskSet.tasks.head.metrics.internalAccums + + // Offer a host. This will launch the first task. + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + + clock.advance(1) + // Tell it the first task has finished successfully + manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) + assert(sched.endedTasks(0) === Success) + + val e = intercept[IllegalStateException]{ + manager.taskInfos.head._2.accumulables + } + assert(e.getMessage.contains("Accumulables for the TaskInfo have been cleared")) + } + + test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { conf => + sc = new SparkContext("local", "test", conf) + sched = FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(2) + val clock = new ManualClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdates = taskSet.tasks.head.metrics.internalAccums + + // Offer a host. This will launch the first task. + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + + clock.advance(1) + // Tell it the first task has finished successfully + manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) + assert(sched.endedTasks(0) === Success) + + // Only one task was launched and it completed successfully, thus the TaskInfo accumulables + // should be empty. + assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty)) + assert(manager.taskAttempts.flatMap(l => l.filter(!_.isAccumulablesEmpty)).isEmpty) + + // Fail the second task (MAX_TASK_FAILURES - 1) times. + (1 to manager.maxTaskFailures - 1).foreach { index => + val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1 + assert(offerResult.isDefined, + "Expect resource offer on iteration %s to return a task".format(index)) + assert(offerResult.get.index === 1) + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) + assert(!sched.taskSetsFailed.contains(FakeTaskFailure(taskSet.id))) + } + + clock.advance(1) + // Successfully finish the second task. + val taskOption1 = manager.resourceOffer("exec1", "host1", ANY)._1 + manager.handleSuccessfulTask(taskOption1.get.taskId, createTaskResult(1, accumUpdates)) + assert(sched.endedTasks(1) === Success) + // The TaskInfo accumulables should be empty as the second task has now completed successfully. + assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty)) + assert(manager.taskAttempts.flatMap(l => l.filter(!_.isAccumulablesEmpty)).isEmpty) + + assert(sched.finishedManagers.contains(manager)) + } + test("TaskSet with no preferences") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) From 0b56a281feaa581108c801e60d6ae49df5d8d21e Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Mon, 18 Dec 2023 12:32:23 -0600 Subject: [PATCH 02/15] change thrown exception type --- core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 2c237ce275da..3b7149de1e81 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -77,7 +77,7 @@ class TaskInfo( */ def accumulables: Seq[AccumulableInfo] = { if (throwOnAccumulablesCall) { - throw new IllegalStateException("Accumulables for the TaskInfo have been cleared") + throw SparkException.internalError("Accumulables for the TaskInfo have been cleared") } else { _accumulables } From c6fc226eb609738d11acacde623e89d877fbe980 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 19 Dec 2023 10:33:41 -0600 Subject: [PATCH 03/15] import SparkException --- core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala | 1 + .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 3b7149de1e81..a5f7beda904d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import org.apache.spark.SparkException import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 894eaac9d699..a2574d2bc195 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -787,7 +787,8 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause successful and tasksSuccessful wrong result. if(info.finished) { - // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime. + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable + // lifetime. info.resetAccumulables() return } From 72e93457be0da37fee0b9b6cd3f3cbc9ce1c314e Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 19 Dec 2023 11:19:30 -0600 Subject: [PATCH 04/15] fix scalastyle --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a2574d2bc195..8757af994e30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -917,7 +917,8 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause copiesRunning wrong result. if (info.finished) { - // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime. + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable + // lifetime. info.resetAccumulables() return } From a60eacac751ce46ff43ca0e8cd3d1d81dcfd976d Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Wed, 20 Dec 2023 10:01:23 -0600 Subject: [PATCH 05/15] fix scalastyle --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 22c91877c96d..85bbbc45cb5c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -235,7 +235,7 @@ class TaskSetManagerSuite } test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " + - "accessed once the task has completed") { conf => + "accessed once the task has completed") { sc = new SparkContext("local", "test", conf) sched = FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) @@ -258,7 +258,7 @@ class TaskSetManagerSuite assert(e.getMessage.contains("Accumulables for the TaskInfo have been cleared")) } - test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { conf => + test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { sc = new SparkContext("local", "test", conf) sched = FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(2) From 677375005855bdde0429ce0a7f1b20c1a69d0e80 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Mon, 1 Jan 2024 14:58:13 -0600 Subject: [PATCH 06/15] Disable config by default --- .../org/apache/spark/internal/config/package.scala | 2 +- .../apache/spark/scheduler/SparkListenerSuite.scala | 3 ++- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 11 ++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2349a3e5d1ef..86d35196d5eb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2629,5 +2629,5 @@ package object config { "task info accumulables for the active tasks and not for completed tasks.") .version("4.0.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 031bcb4bfaf0..d7d24a27da28 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -291,7 +291,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("SPARK-46383: Track TaskInfo objects") { - sc = new SparkContext("local", "SparkListenerSuite") + sc = new SparkContext("local", "SparkListenerSuite"). + set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) val listener = new SaveActiveTaskInfos sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 85bbbc45cb5c..d64d22184556 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -236,8 +236,9 @@ class TaskSetManagerSuite test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " + "accessed once the task has completed") { - sc = new SparkContext("local", "test", conf) - sched = FakeTaskScheduler(sc, ("exec1", "host1")) + sc = new SparkContext("local", "test", conf). + set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) @@ -259,8 +260,9 @@ class TaskSetManagerSuite } test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { - sc = new SparkContext("local", "test", conf) - sched = FakeTaskScheduler(sc, ("exec1", "host1")) + sc = new SparkContext("local", "test", conf). + set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(2) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) @@ -287,7 +289,6 @@ class TaskSetManagerSuite "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 1) manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) - assert(!sched.taskSetsFailed.contains(FakeTaskFailure(taskSet.id))) } clock.advance(1) From cd804085699842f9717712db177d6c1d88878707 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 2 Jan 2024 12:09:17 -0600 Subject: [PATCH 07/15] Fix compilation --- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index d7d24a27da28..6cf6347a8495 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -291,8 +291,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("SPARK-46383: Track TaskInfo objects") { - sc = new SparkContext("local", "SparkListenerSuite"). - set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + sc = new SparkContext("local", "SparkListenerSuite", conf) val listener = new SaveActiveTaskInfos sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d64d22184556..c934006a372f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -236,8 +236,9 @@ class TaskSetManagerSuite test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " + "accessed once the task has completed") { - sc = new SparkContext("local", "test", conf). + val conf = new SparkConf(). set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock @@ -260,8 +261,9 @@ class TaskSetManagerSuite } test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { - sc = new SparkContext("local", "test", conf). + val conf = new SparkConf(). set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(2) val clock = new ManualClock From aeed0e284d7580f5c5cbc81e9418fc21600b8dad Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 2 Jan 2024 16:33:27 -0600 Subject: [PATCH 08/15] Fix test --- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c934006a372f..cbabcb6ab2ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -254,7 +254,7 @@ class TaskSetManagerSuite manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) assert(sched.endedTasks(0) === Success) - val e = intercept[IllegalStateException]{ + val e = intercept[SparkException]{ manager.taskInfos.head._2.accumulables } assert(e.getMessage.contains("Accumulables for the TaskInfo have been cleared")) From e72830bd2785c085663f1c20fa2dbc01a2bd6a98 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Fri, 5 Jan 2024 10:36:08 -0600 Subject: [PATCH 09/15] Address review comments --- .../org/apache/spark/scheduler/TaskInfo.scala | 15 +------ .../spark/scheduler/TaskSetManager.scala | 44 +++++++++++-------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index a5f7beda904d..eccab398cdc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -76,13 +76,7 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - def accumulables: Seq[AccumulableInfo] = { - if (throwOnAccumulablesCall) { - throw SparkException.internalError("Accumulables for the TaskInfo have been cleared") - } else { - _accumulables - } - } + def accumulables: Seq[AccumulableInfo] = _accumulables private[this] var _accumulables: Seq[AccumulableInfo] = Nil @@ -90,11 +84,6 @@ class TaskInfo( _accumulables = newAccumulables } - /** - * If true, a call to TaskInfo.accumulables() will throw an exception. - */ - private var throwOnAccumulablesCall: Boolean = false - override def clone(): TaskInfo = super.clone().asInstanceOf[TaskInfo] /** @@ -107,13 +96,11 @@ class TaskInfo( private[scheduler] def resetAccumulables(): Unit = { setAccumulables(Nil) - throwOnAccumulablesCall = true } private[scheduler] def cloneWithEmptyAccumulables(): TaskInfo = { val cloned = clone() cloned.setAccumulables(Nil) - cloned.throwOnAccumulablesCall = true cloned } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8757af994e30..7f9efdbeca41 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -258,6 +258,9 @@ private[spark] class TaskSetManager( private[scheduler] var emittedTaskSizeWarning = false + private[scheduler] val dropTaskInfoAccumulablesOnTaskCompletion = + conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION) + /** Add a task to all the pending-task lists that it should be on. */ private[spark] def addPendingTask( index: Int, @@ -787,9 +790,11 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause successful and tasksSuccessful wrong result. if(info.finished) { - // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable - // lifetime. - info.resetAccumulables() + if (dropTaskInfoAccumulablesOnTaskCompletion) { + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable + // lifetime. + info.resetAccumulables() + } return } val index = info.index @@ -853,7 +858,7 @@ private[spark] class TaskSetManager( // here "result.value()" just returns the value and won't block other threads. emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(), - result.accumUpdates, result.metricPeaks, info) + result.accumUpdates, result.metricPeaks) maybeFinishTaskSet() } @@ -881,17 +886,18 @@ private[spark] class TaskSetManager( reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], - metricPeaks: Array[Long], - taskInfo: TaskInfo): Unit = { - val index = taskInfo.index - if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) { - val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables() + metricPeaks: Array[Long]): Unit = { + val taskInfoWithAccumulables = taskInfos(taskId); + if (dropTaskInfoAccumulablesOnTaskCompletion) { + val index = taskInfoWithAccumulables.index + val clonedTaskInfo = taskInfoWithAccumulables.cloneWithEmptyAccumulables() // Update this task's taskInfo while preserving its position in the list taskAttempts(index) = - taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i } + taskAttempts(index).map { i => if (i eq taskInfoWithAccumulables) clonedTaskInfo else i } taskInfos(taskId) = clonedTaskInfo } - sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo) + sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, + taskInfoWithAccumulables) } private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { @@ -917,9 +923,11 @@ private[spark] class TaskSetManager( // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause copiesRunning wrong result. if (info.finished) { - // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable - // lifetime. - info.resetAccumulables() + if (dropTaskInfoAccumulablesOnTaskCompletion) { + // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable + // lifetime. + info.resetAccumulables() + } return } removeRunningTask(tid) @@ -955,7 +963,7 @@ private[spark] class TaskSetManager( // If the task result wasn't serializable, there's no point in trying to re-execute it. logError(s"$task had a not serializable result: ${ef.description}; not retrying") emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, - accumUpdates, metricPeaks, info) + accumUpdates, metricPeaks) abort(s"$task had a not serializable result: ${ef.description}") return } @@ -965,7 +973,7 @@ private[spark] class TaskSetManager( logError("Task %s in stage %s (TID %d) can not write to output file: %s; not retrying" .format(info.id, taskSet.id, tid, ef.description)) emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, - accumUpdates, metricPeaks, info) + accumUpdates, metricPeaks) abort("Task %s in stage %s (TID %d) can not write to output file: %s".format( info.id, taskSet.id, tid, ef.description)) return @@ -1019,7 +1027,7 @@ private[spark] class TaskSetManager( } emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, - accumUpdates, metricPeaks, info) + accumUpdates, metricPeaks) if (!isZombie && reason.countTowardsTaskFailures) { assert (null != failureReason) @@ -1136,7 +1144,7 @@ private[spark] class TaskSetManager( // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, - tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) + tasks(index), Resubmitted, null, Seq.empty, Array.empty) } } } From 8cbe95158040f0f42f790413c0d8f8405b1d9f81 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Fri, 5 Jan 2024 10:39:14 -0600 Subject: [PATCH 10/15] Clean up --- .../scala/org/apache/spark/scheduler/TaskInfo.scala | 13 ------------- .../org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 8 ++++---- 3 files changed, 6 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index eccab398cdc8..9ed95870d240 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import org.apache.spark.SparkException import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi @@ -86,18 +85,6 @@ class TaskInfo( override def clone(): TaskInfo = super.clone().asInstanceOf[TaskInfo] - /** - * For testing only. Allows probing accumulables without triggering the exception when - * `throwOnAccumulablesCall` is set. - */ - private[scheduler] def isAccumulablesEmpty(): Boolean = { - _accumulables.isEmpty - } - - private[scheduler] def resetAccumulables(): Unit = { - setAccumulables(Nil) - } - private[scheduler] def cloneWithEmptyAccumulables(): TaskInfo = { val cloned = clone() cloned.setAccumulables(Nil) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7f9efdbeca41..e0d2c9ead94d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -793,7 +793,7 @@ private[spark] class TaskSetManager( if (dropTaskInfoAccumulablesOnTaskCompletion) { // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable // lifetime. - info.resetAccumulables() + info.setAccumulables(Nil) } return } @@ -926,7 +926,7 @@ private[spark] class TaskSetManager( if (dropTaskInfoAccumulablesOnTaskCompletion) { // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable // lifetime. - info.resetAccumulables() + info.setAccumulables(Nil) } return } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index cbabcb6ab2ac..0cbfa3b4d56f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -281,8 +281,8 @@ class TaskSetManagerSuite // Only one task was launched and it completed successfully, thus the TaskInfo accumulables // should be empty. - assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty)) - assert(manager.taskAttempts.flatMap(l => l.filter(!_.isAccumulablesEmpty)).isEmpty) + assert(!manager.taskInfos.exists(l => !l._2.accumulables.isEmpty)) + assert(manager.taskAttempts.flatMap(l => l.filter(!_.accumulables.isEmpty)).isEmpty) // Fail the second task (MAX_TASK_FAILURES - 1) times. (1 to manager.maxTaskFailures - 1).foreach { index => @@ -299,8 +299,8 @@ class TaskSetManagerSuite manager.handleSuccessfulTask(taskOption1.get.taskId, createTaskResult(1, accumUpdates)) assert(sched.endedTasks(1) === Success) // The TaskInfo accumulables should be empty as the second task has now completed successfully. - assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty)) - assert(manager.taskAttempts.flatMap(l => l.filter(!_.isAccumulablesEmpty)).isEmpty) + assert(!manager.taskInfos.exists(l => !l._2.accumulables.isEmpty)) + assert(manager.taskAttempts.flatMap(l => l.filter(!_.accumulables.isEmpty)).isEmpty) assert(sched.finishedManagers.contains(manager)) } From 03cedb4c128bc1abcdbf6385938472590e7b7682 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Fri, 5 Jan 2024 10:45:04 -0600 Subject: [PATCH 11/15] Add comment --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6cf6347a8495..72ab2b8863ef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -291,6 +291,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("SPARK-46383: Track TaskInfo objects") { + // Test that the same TaskInfo object is sent to the `DAGScheduler` in the `onTaskStart` and + // `onTaskEnd` events. val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) sc = new SparkContext("local", "SparkListenerSuite", conf) val listener = new SaveActiveTaskInfos From 65c327f10b29761c8336a5def91e92a7f7e0e3e6 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Fri, 5 Jan 2024 10:47:04 -0600 Subject: [PATCH 12/15] Change var in test from l to t --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0cbfa3b4d56f..177b49747fdb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -281,8 +281,8 @@ class TaskSetManagerSuite // Only one task was launched and it completed successfully, thus the TaskInfo accumulables // should be empty. - assert(!manager.taskInfos.exists(l => !l._2.accumulables.isEmpty)) - assert(manager.taskAttempts.flatMap(l => l.filter(!_.accumulables.isEmpty)).isEmpty) + assert(!manager.taskInfos.exists(t => !t._2.accumulables.isEmpty)) + assert(manager.taskAttempts.flatMap(t => t.filter(!_.accumulables.isEmpty)).isEmpty) // Fail the second task (MAX_TASK_FAILURES - 1) times. (1 to manager.maxTaskFailures - 1).foreach { index => @@ -299,8 +299,8 @@ class TaskSetManagerSuite manager.handleSuccessfulTask(taskOption1.get.taskId, createTaskResult(1, accumUpdates)) assert(sched.endedTasks(1) === Success) // The TaskInfo accumulables should be empty as the second task has now completed successfully. - assert(!manager.taskInfos.exists(l => !l._2.accumulables.isEmpty)) - assert(manager.taskAttempts.flatMap(l => l.filter(!_.accumulables.isEmpty)).isEmpty) + assert(!manager.taskInfos.exists(t => !t._2.accumulables.isEmpty)) + assert(manager.taskAttempts.flatMap(t => t.filter(!_.accumulables.isEmpty)).isEmpty) assert(sched.finishedManagers.contains(manager)) } From 0bffbd21b290cd1231c720f72ef5a42cae8bbbe5 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Fri, 5 Jan 2024 16:59:16 -0600 Subject: [PATCH 13/15] Drop test checking for exception upon accessing empty accumulabled --- .../spark/scheduler/TaskSetManagerSuite.scala | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 177b49747fdb..82f7e3d0fc06 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -234,32 +234,6 @@ class TaskSetManagerSuite super.afterEach() } - test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " + - "accessed once the task has completed") { - val conf = new SparkConf(). - set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) - sc = new SparkContext("local", "test", conf) - sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - val taskSet = FakeTask.createTaskSet(1) - val clock = new ManualClock - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - val accumUpdates = taskSet.tasks.head.metrics.internalAccums - - // Offer a host. This will launch the first task. - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 - assert(taskOption.isDefined) - - clock.advance(1) - // Tell it the first task has finished successfully - manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) - assert(sched.endedTasks(0) === Success) - - val e = intercept[SparkException]{ - manager.taskInfos.head._2.accumulables - } - assert(e.getMessage.contains("Accumulables for the TaskInfo have been cleared")) - } - test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") { val conf = new SparkConf(). set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) From 1e2793517495e4544378b6d56f30298e3e4f4cc2 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Wed, 10 Jan 2024 17:42:43 -0600 Subject: [PATCH 14/15] Address review comments --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 4 +--- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 72ab2b8863ef..34b2a40d1e3b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -674,9 +674,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match override def onTaskEnd(task: SparkListenerTaskEnd): Unit = { val info = task.taskInfo - if (info != null && taskInfos.contains(info)) { - taskInfos.remove(info) - } + taskInfos.remove(info) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 82f7e3d0fc06..4976347e9590 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -61,12 +61,13 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) accumUpdates: Seq[AccumulatorV2[_, _]], metricPeaks: Array[Long], taskInfo: TaskInfo): Unit = { + // Set task accumulables emulating DAGScheduler behavior to enable tests related to + // `TaskInfo.accumulables`. accumUpdates.foreach(acc => taskInfo.setAccumulables( acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables) ) taskScheduler.endedTasks(taskInfo.index) = reason - taskScheduler.endedTasks(taskInfo.index) = reason } override def executorAdded(execId: String, host: String): Unit = {} From aca9329af82dd9e06cac9ab676ef0671bc1eba91 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Thu, 11 Jan 2024 14:10:06 -0600 Subject: [PATCH 15/15] Fix style --- .../org/apache/spark/internal/config/package.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 86d35196d5eb..bbd79c8b9653 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2623,11 +2623,11 @@ package object config { private[spark] val DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION = ConfigBuilder("spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled") - .internal() - .doc("If true, the task info accumulables will be cleared upon task completion in " + - "TaskSetManager. This reduces the heap usage of the driver by only referencing the " + - "task info accumulables for the active tasks and not for completed tasks.") - .version("4.0.0") - .booleanConf - .createWithDefault(false) + .internal() + .doc("If true, the task info accumulables will be cleared upon task completion in " + + "TaskSetManager. This reduces the heap usage of the driver by only referencing the " + + "task info accumulables for the active tasks and not for completed tasks.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) }