Skip to content

Conversation

@utkarsh39
Copy link
Contributor

@utkarsh39 utkarsh39 commented Dec 12, 2023

What changes were proposed in this pull request?

AccumulableInfo is one of the top heap consumers in driver's heap dumps for stages with many tasks. For a stage with a large number of tasks (O(100k)), we saw 30% of the heap usage stemming from TaskInfo.accumulables().

image

The TaskSetManager today keeps around the TaskInfo objects (ref1, ref2)) and in turn the task metrics (AccumulableInfo) for every task attempt until the stage is completed. This means that for stages with a large number of tasks, we keep metrics for all the tasks (AccumulableInfo) around even when the task has completed and its metrics have been aggregated. Given a task has a large number of metrics, stages with many tasks end up with a large heap usage in the form of task metrics.

This PR is an opt-in change (disabled by default) to reduce the driver's heap usage for stages with many tasks by no longer referencing the task metrics of completed tasks. Once a task is completed in TaskSetManager, we no longer keep its metrics around. Upon task completion, we clone the TaskInfo object and empty out the metrics for the clone. The cloned TaskInfo is retained by the TaskSetManager while the original TaskInfo object with the metrics is sent over to the DAGScheduler where the task metrics are aggregated. Thus for a completed task, TaskSetManager holds a TaskInfo object with empty metrics. This reduces the memory footprint by ensuring that the number of task metric objects is proportional to the number of active tasks and not to the total number of tasks in the stage.

Config to gate changes

The changes in the PR are guarded with the Spark conf spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled which can be used for rollback or staged rollouts.

Why are the changes disabled by default?

The PR introduces a breaking change wherein the TaskInfo.accumulables() are empty for Resubmitted tasks upon the loss of an executor. Read #44321 (review) for details.

Why are the changes needed?

Reduce driver's heap usage, especially for stages with many tasks

Benchmarking

On a cluster running a scan stage with 100k tasks, the TaskSetManager's heap usage dropped from 1.1 GB to 37 MB. This reduced the total driver's heap usage by 38%, down to 2 GB from 3.5 GB.

BEFORE

image
WITH FIX
image

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new tests and did benchmarking on a cluster.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot

@github-actions github-actions bot added the CORE label Dec 12, 2023
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
// SPARK-37300: when the task was already finished state, just ignore it,
// so that there won't cause successful and tasksSuccessful wrong result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this comment, the partition is already completed, probably by another TaskSetManager, and we just need to reset the task info here?

Copy link
Contributor

@JoshRosen JoshRosen Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch is handling a rare corner-case where the same TaskSetManager can mark the same task as both succeeded and failed. There is some detailed prior discussion of this in https://issues.apache.org/jira/browse/SPARK-37300

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending minor comments and test re-triggering (it looks like the first CI run failed in checkout).

@mridulm
Copy link
Contributor

mridulm commented Dec 16, 2023

I have not looked into this in a lot of detail (and given my vacation plans, might not be able to unfortunately).
Will drop a note for @cloud-fan and @JoshRosen - perhaps you have analyzed it and this is not a concern.

LiveTask keeps a reference to TaskInfo and references accumulables there. Given the potential delays between task events getting fired and actual scheduler updates (due to delays in event processing), will this PR cause issues ?

@cloud-fan
Copy link
Contributor

@mridulm LiveTask gets the TaskInfo via the listener, this PR sends the original TaskInfo instance to DAGScheduler and thus to the event bus. We keep the cloned TaskInfo with empty accumulables in TaskSetManager, assuming the listener won't hold the original TaskInfo instance for a long time. Built-in listeners are fine, they just aggregate and throw away. User listeners may still cause memory issues, but this is out of our control.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On re-review, I think I may spot a potential unintended behavior change, but it's a bit of a subtle corner-case and may actually be something that we're okay with changing:

By design, TaskSetManager is only supposed to be called while holding a task scheduler lock, so the code written here can assume serial operations. Given this, as an informal proof technique we can try to establish that a given task attempt's taskInfo will be cleared exactly once (freeing us from concerns around whether the cleared cloned task info can subsequently escape the scope of the TaskSetManager and be exposed to outside code):

  • handleSuccessfulTask: this method exits early if a task is already finished. Otherwise, it updates the stored copy of the info and forwards the original to the DAGScheduler.
  • handleFailedTask: similarly, this method exits early on already finished tasks, and otherwise notifies the DAGScheduler.
  • executorLost: there are a few branches in this method:
    • Running tasks are marked as failed, triggering the handleFailedTask branch.
    • Some completed tasks whose map output was lost may be resubmitted.
      • ⚠️ I think there might be a subtle unintended behavior change here: the logic at
        // We may have a running task whose partition has been marked as successful,
        // this partition has another task completed in another stage attempt.
        // We treat it as a running task and will call handleFailedTask later.
        if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid) &&
        !isShuffleMapOutputAvailable) {
        successful(index) = false
        copiesRunning(index) -= 1
        tasksSuccessful -= 1
        addPendingTask(index)
        // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
        // stage finishes when a total of tasks.size tasks finish.
        emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
        tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
        will take a completed task's task info and use it in a second Resubmitted event. Over in the DAGScheduler, it looks like the processing of Resubmitted failures is done after we've done the listener event posting. Thus, I think this PR might result in subtle changes to the listener behavior of resubmitted tasks: previously, the task info from the original successful attempt would be posted for the resubmission DAGScheduler event (and thus listener event), but now we will pass in an event with empty accumulables and that could cause problems if downstream listener code tries to access those accumulables.

We can't realize the significant memory savings if we also want to preserve the listener-visible implicit behavior in the succeeded-then-resubmitted path.

On the other hand, there are already some significant differences in the resubmitted event path: the call at

// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
is already passing empty accumUpdates and metricsPeaks. Given this, it might be possible that it's okay to make an implicit breaking change here, but we should discuss.

Even if we choose to go that route and accept the behavior change, it might mean that we cannot straightforwardly use the same throwOnAccumulablesCall logic as it is currently written, since we don't actually have an invariant that cleared task infos cannot flow to other components. If we lift that invariant, though, then we need to be extra careful to not introduce bugs of unexpected downstream flowing of a cleared task info.

@mridulm
Copy link
Contributor

mridulm commented Dec 17, 2023

@cloud-fan thanks for checking !
Note that this would fail for executorLost case though (I will call it out in the exact location) - since the Resubmitted event will now have invalid accumulables.

At a minimum, this should be an opt-in and not default on.

@utkarsh39
Copy link
Contributor Author

Proposal To Gain Consensus
The PR alleviates memory pressure on the driver although at the cost of introducing a breaking change as identified by @JoshRosen in #44321 (review). I propose that we disable the feature by default and introduce a breaking change wherein the TaskInfo.accumulables() are empty for Resubmitted tasks upon the loss of an executor? The behavior change would be to return an empty Accumulables as opposed to returning Accumulables of a earlier successful task attempt today. When this change is enabled, the behavior change will affect the following consumers:

  1. EventLoggingListener where task accumulables are serialized to JSON upon task completion (code link).
  2. Custom Spark Listeners installed by Spark users

What do the reviewers think of the proposal?

Note that the current design in the PR does not implement this proposal. Currently, accessing the empty accumulables would result in a crash. I will refactor the change if agree upon this proposal.

@mridulm
Copy link
Contributor

mridulm commented Dec 21, 2023

Sounds good to me, thoughts @JoshRosen, @cloud-fan ?

@cloud-fan
Copy link
Contributor

SGTM

@JoshRosen
Copy link
Contributor

The proposed "make the behavior change optional and off-by-default with option for users to opt-in" approach sounds reasonable to me: users or platforms that don't rely on the hopefully-rare corner-case listener behavior can choose to opt-in in order to address a major contributor to driver memory problems with large task sets 👍 .

@utkarsh39
Copy link
Contributor Author

Disabled the changes by default @JoshRosen @mridulm. Can you all PTAL?

@utkarsh39 utkarsh39 requested review from JoshRosen and mridulm January 2, 2024 22:36
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick pass

val rdd1 = sc.parallelize(1 to 100, 4)
sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
sc.listenerBus.waitUntilEmpty()
listener.taskInfos.size should be { 0 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow this test, what is it trying to do ?
This test will be successful even with DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION = true, right ? (Since it is simply checking for instance equality in the fired event ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts that the same TaskInfo object is sent in the onTaskStart and onTaskEnd events. This test asserts the design in this PR that we are sending the original TaskInfo object to the DAGScheduler upon task completion and not a clone.

Copy link
Contributor

@mridulm mridulm Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that not simply an implementation detail ? (for ex, the resubmission case would break it)
I am not sure what is the behavior we are testing for here - and how would this test help with some future change (and validation).

I dont see a harm is keeping it, but want to make sure I am not missing something here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind dropping it. I was just trying to assert one of the ways SparkListeners could be used. The test is more of a general test to ensure that we preserve the behavior of SparkListeners

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally that (the right task info is in the event) should be covered already (in use of SaveStageAndTaskInfo for example). Do let me know if that is not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SaveActiveTaskInfos is caching TaskInfos but there are no tests on TaskInfo objects and none asserting that the TaskInfo objects are expected to remain the same across listener events

@utkarsh39 utkarsh39 requested a review from mridulm January 5, 2024 16:54
@utkarsh39
Copy link
Contributor Author

@mridulm Can you PTAL?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of minor comments.

+CC @JoshRosen, @cloud-fan

val rdd1 = sc.parallelize(1 to 100, 4)
sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
sc.listenerBus.waitUntilEmpty()
listener.taskInfos.size should be { 0 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally that (the right task info is in the event) should be covered already (in use of SaveStageAndTaskInfo for example). Do let me know if that is not the case.

@utkarsh39 utkarsh39 requested a review from mridulm January 10, 2024 23:43
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 28da1d8 Jan 12, 2024
Mrhs121 pushed a commit to Mrhs121/spark that referenced this pull request Apr 17, 2024
…askInfo.accumulables()`

### What changes were proposed in this pull request?
`AccumulableInfo` is one of the top heap consumers in driver's heap dumps for stages with many tasks. For a stage with a large number of tasks (**_O(100k)_**), we saw **30%** of the heap usage stemming from `TaskInfo.accumulables()`.

![image](https://github.com/apache/spark/assets/10495099/13ef5d07-abfc-47fd-81b6-705f599db011)

The `TaskSetManager` today keeps around the TaskInfo objects ([ref1](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134), [ref2](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192))) and in turn the task metrics (`AccumulableInfo`) for every task attempt until the stage is completed. This means that for stages with a large number of tasks, we keep metrics for all the tasks (`AccumulableInfo`) around even when the task has completed and its metrics have been aggregated. Given a task has a large number of metrics, stages with many tasks end up with a large heap usage in the form of task metrics.

This PR is an opt-in change (disabled by default) to reduce the driver's heap usage for stages with many tasks by no longer referencing the task metrics of completed tasks. Once a task is completed in `TaskSetManager`, we no longer keep its metrics around. Upon task completion, we clone the `TaskInfo` object and empty out the metrics for the clone. The cloned `TaskInfo` is retained by the `TaskSetManager` while the original `TaskInfo` object with the metrics is sent over to the `DAGScheduler` where the task metrics are aggregated. Thus for a completed task, `TaskSetManager` holds a `TaskInfo` object with empty metrics. This reduces the memory footprint by ensuring that the number of task metric objects is proportional to the number of active tasks and not to the total number of tasks in the stage.

### Config to gate changes
The changes in the PR are guarded with the Spark conf `spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled` which can be used for rollback or staged rollouts.

### Why are the changes disabled by default?
The PR introduces a breaking change wherein the `TaskInfo.accumulables()` are empty for `Resubmitted` tasks upon the loss of an executor. Read apache#44321 (review) for details.

### Why are the changes needed?

Reduce driver's heap usage, especially for stages with many tasks

## Benchmarking
On a cluster running a scan stage with 100k tasks, the TaskSetManager's heap usage dropped from 1.1 GB to 37 MB. This  **reduced the total driver's heap usage by 38%**, down to 2 GB from 3.5 GB.

**BEFORE**

![image](https://github.com/databricks/runtime/assets/10495099/7c1599f3-3587-48a1-b019-84115b1bb90d)
**WITH FIX**
<img width="1386" alt="image" src="https://github.com/databricks/runtime/assets/10495099/b85129c8-dc10-4ee2-898d-61c8e7449616">

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new tests and did benchmarking on a cluster.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot

Closes apache#44321 from utkarsh39/SPARK-46383.

Authored-by: Utkarsh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

(cherry picked from commit 28da1d8)
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…fespan of `TaskInfo.accumulables()` (apache#479)

### What changes were proposed in this pull request?
`AccumulableInfo` is one of the top heap consumers in driver's heap dumps for stages with many tasks. For a stage with a large number of tasks (**_O(100k)_**), we saw **30%** of the heap usage stemming from `TaskInfo.accumulables()`.

![image](https://github.com/apache/spark/assets/10495099/13ef5d07-abfc-47fd-81b6-705f599db011)

The `TaskSetManager` today keeps around the TaskInfo objects ([ref1](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134), [ref2](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192))) and in turn the task metrics (`AccumulableInfo`) for every task attempt until the stage is completed. This means that for stages with a large number of tasks, we keep metrics for all the tasks (`AccumulableInfo`) around even when the task has completed and its metrics have been aggregated. Given a task has a large number of metrics, stages with many tasks end up with a large heap usage in the form of task metrics.

This PR is an opt-in change (disabled by default) to reduce the driver's heap usage for stages with many tasks by no longer referencing the task metrics of completed tasks. Once a task is completed in `TaskSetManager`, we no longer keep its metrics around. Upon task completion, we clone the `TaskInfo` object and empty out the metrics for the clone. The cloned `TaskInfo` is retained by the `TaskSetManager` while the original `TaskInfo` object with the metrics is sent over to the `DAGScheduler` where the task metrics are aggregated. Thus for a completed task, `TaskSetManager` holds a `TaskInfo` object with empty metrics. This reduces the memory footprint by ensuring that the number of task metric objects is proportional to the number of active tasks and not to the total number of tasks in the stage.

### Config to gate changes
The changes in the PR are guarded with the Spark conf `spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled` which can be used for rollback or staged rollouts.

### Why are the changes disabled by default?
The PR introduces a breaking change wherein the `TaskInfo.accumulables()` are empty for `Resubmitted` tasks upon the loss of an executor. Read apache#44321 (review) for details.

### Why are the changes needed?

Reduce driver's heap usage, especially for stages with many tasks

## Benchmarking
On a cluster running a scan stage with 100k tasks, the TaskSetManager's heap usage dropped from 1.1 GB to 37 MB. This  **reduced the total driver's heap usage by 38%**, down to 2 GB from 3.5 GB.

**BEFORE**

![image](https://github.com/databricks/runtime/assets/10495099/7c1599f3-3587-48a1-b019-84115b1bb90d)
**WITH FIX**
<img width="1386" alt="image" src="https://github.com/databricks/runtime/assets/10495099/b85129c8-dc10-4ee2-898d-61c8e7449616">

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new tests and did benchmarking on a cluster.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot

Closes apache#44321 from utkarsh39/SPARK-46383.

Authored-by: Utkarsh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

(cherry picked from commit 28da1d8)

Co-authored-by: Utkarsh <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants