@@ -787,6 +787,8 @@ private[spark] class TaskSetManager(
787787 // SPARK-37300: when the task was already finished state, just ignore it,
788788 // so that there won't cause successful and tasksSuccessful wrong result.
789789 if (info.finished) {
790+ // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime.
791+ info.resetAccumulables()
790792 return
791793 }
792794 val index = info.index
@@ -804,6 +806,8 @@ private[spark] class TaskSetManager(
804806 // Handle this task as a killed task
805807 handleFailedTask(tid, TaskState .KILLED ,
806808 TaskKilled (" Finish but did not commit due to another attempt succeeded" ))
809+ // SPARK-46383: Not clearing the accumulables here because they are already cleared in
810+ // handleFailedTask.
807811 return
808812 }
809813
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
846850 // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
847851 // Note: "result.value()" only deserializes the value when it's called at the first time, so
848852 // here "result.value()" just returns the value and won't block other threads.
849- sched.dagScheduler.taskEnded(tasks(index), Success , result.value(), result.accumUpdates,
850- result.metricPeaks, info)
853+
854+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success , result.value(),
855+ result.accumUpdates, result.metricPeaks, info)
851856 maybeFinishTaskSet()
852857 }
853858
859+ /**
860+ * A wrapper around [[DAGScheduler.taskEnded() ]] that empties out the accumulables for the
861+ * TaskInfo object, corresponding to the completed task, referenced by this class.
862+ *
863+ * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
864+ * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
865+ * object that corresponds to the completed task.
866+ * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
867+ * objects held by this class are long-lived and have a heavy memory footprint on the driver.
868+ *
869+ * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
870+ * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
871+ * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
872+ * DAGScheduler on multiple events during the task's lifetime. Users can install
873+ * SparkListeners that compare the TaskInfo objects across these SparkListener events and
874+ * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
875+ * object.
876+ */
877+ private def emptyTaskInfoAccumulablesAndNotifyDagScheduler (
878+ taskId : Long ,
879+ task : Task [_],
880+ reason : TaskEndReason ,
881+ result : Any ,
882+ accumUpdates : Seq [AccumulatorV2 [_, _]],
883+ metricPeaks : Array [Long ],
884+ taskInfo : TaskInfo ): Unit = {
885+ val index = taskInfo.index
886+ if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION )) {
887+ val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
888+ // Update this task's taskInfo while preserving its position in the list
889+ taskAttempts(index) =
890+ taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }
891+ taskInfos(taskId) = clonedTaskInfo
892+ }
893+ sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo)
894+ }
895+
854896 private [scheduler] def markPartitionCompleted (partitionId : Int ): Unit = {
855897 partitionToIndex.get(partitionId).foreach { index =>
856898 if (! successful(index)) {
@@ -874,6 +916,8 @@ private[spark] class TaskSetManager(
874916 // SPARK-37300: when the task was already finished state, just ignore it,
875917 // so that there won't cause copiesRunning wrong result.
876918 if (info.finished) {
919+ // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable lifetime.
920+ info.resetAccumulables()
877921 return
878922 }
879923 removeRunningTask(tid)
@@ -908,7 +952,8 @@ private[spark] class TaskSetManager(
908952 if (ef.className == classOf [NotSerializableException ].getName) {
909953 // If the task result wasn't serializable, there's no point in trying to re-execute it.
910954 logError(s " $task had a not serializable result: ${ef.description}; not retrying " )
911- sched.dagScheduler.taskEnded(tasks(index), reason, null , accumUpdates, metricPeaks, info)
955+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null ,
956+ accumUpdates, metricPeaks, info)
912957 abort(s " $task had a not serializable result: ${ef.description}" )
913958 return
914959 }
@@ -917,7 +962,8 @@ private[spark] class TaskSetManager(
917962 // re-execute it.
918963 logError(" Task %s in stage %s (TID %d) can not write to output file: %s; not retrying"
919964 .format(info.id, taskSet.id, tid, ef.description))
920- sched.dagScheduler.taskEnded(tasks(index), reason, null , accumUpdates, metricPeaks, info)
965+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null ,
966+ accumUpdates, metricPeaks, info)
921967 abort(" Task %s in stage %s (TID %d) can not write to output file: %s" .format(
922968 info.id, taskSet.id, tid, ef.description))
923969 return
@@ -970,7 +1016,8 @@ private[spark] class TaskSetManager(
9701016 isZombie = true
9711017 }
9721018
973- sched.dagScheduler.taskEnded(tasks(index), reason, null , accumUpdates, metricPeaks, info)
1019+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null ,
1020+ accumUpdates, metricPeaks, info)
9741021
9751022 if (! isZombie && reason.countTowardsTaskFailures) {
9761023 assert (null != failureReason)
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
10861133 addPendingTask(index)
10871134 // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
10881135 // stage finishes when a total of tasks.size tasks finish.
1089- sched.dagScheduler.taskEnded(
1136+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
10901137 tasks(index), Resubmitted , null , Seq .empty, Array .empty, info)
10911138 }
10921139 }
0 commit comments